Install snapshot and Reply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 54e0494b9da65305729afcf00686e3102c31dd00..610fdc987fde7a1a51491ef25dc2f764011b9eda 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (ยง5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }