Merge "BUG 1623 - Clustering : Parsing Error thrown on startup"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 823d563a74e2d6d6ac7132f3adfed10b0a866170..610fdc987fde7a1a51491ef25dc2f764011b9eda 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -40,20 +47,38 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+            context.getLogger()
+                .debug(appendEntries.toString());
+        }
+
+        // TODO : Refactor this method into a bunch of smaller methods
+        // to make it easier to read. Before refactoring ensure tests
+        // cover the code properly
+
+        // 1. Reply false if term < currentTerm (§5.1)
+        // This is handled in the appendEntries method of the base class
+
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
 
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
+
         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
             .get(appendEntries.getPrevLogIndex());
 
 
-        boolean noMatchingTerms = true;
+        boolean outOfSync = true;
 
+        // First check if the logs are in sync or not
         if (lastIndex() == -1
             && appendEntries.getPrevLogIndex() != -1) {
 
+            // The follower's log is out of sync because the leader does have
+            // an entry at prevLogIndex and this follower has no entries in
+            // it's log.
+
             context.getLogger().debug(
                 "The followers log is empty and the senders prevLogIndex is {}",
                 appendEntries.getPrevLogIndex());
@@ -62,6 +87,9 @@ public class Follower extends AbstractRaftActorBehavior {
             && appendEntries.getPrevLogIndex() != -1
             && previousEntry == null) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry was not found in it's log
+
             context.getLogger().debug(
                 "The log is not empty but the prevLogIndex {} was not found in it",
                 appendEntries.getPrevLogIndex());
@@ -70,15 +98,24 @@ public class Follower extends AbstractRaftActorBehavior {
             && previousEntry != null
             && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry does exist in the follower's log but it has
+            // a different term in it
+
             context.getLogger().debug(
                 "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
                 , previousEntry.getTerm()
                 , appendEntries.getPrevLogTerm());
         } else {
-            noMatchingTerms = false;
+            outOfSync = false;
         }
 
-        if (noMatchingTerms) {
+        if (outOfSync) {
+            // We found that the log was out of sync so just send a negative
+            // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -98,33 +135,31 @@ public class Follower extends AbstractRaftActorBehavior {
             // follow it (§5.3)
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
-                for (int i = 0;
-                     i < appendEntries.getEntries()
-                         .size(); i++, addEntriesFrom++) {
-                    ReplicatedLogEntry matchEntry =
-                        appendEntries.getEntries().get(i);
-                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
-                        .get(matchEntry.getIndex());
+
+                // Find the entry up until which the one that is not in the follower's log
+                for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
 
                     if (newEntry == null) {
                         //newEntry not found in the log
                         break;
                     }
 
-                    if (newEntry != null && newEntry.getTerm() == matchEntry
+                    if (newEntry.getTerm() == matchEntry
                         .getTerm()) {
                         continue;
                     }
-                    if (newEntry != null && newEntry.getTerm() != matchEntry
-                        .getTerm()) {
-                        context.getLogger().debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
-                        );
-                        context.getReplicatedLog()
-                            .removeFrom(matchEntry.getIndex());
-                        break;
-                    }
+
+                    context.getLogger().debug(
+                        "Removing entries from log starting at "
+                            + matchEntry.getIndex()
+                    );
+
+                    // Entries do not match so remove all subsequent entries
+                    context.getReplicatedLog()
+                        .removeFromAndPersist(matchEntry.getIndex());
+                    break;
                 }
             }
 
@@ -136,8 +171,10 @@ public class Follower extends AbstractRaftActorBehavior {
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
-                context.getLogger().debug(
-                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+
+                context.getLogger().info(
+                    "Append entry to log " + appendEntries.getEntries().get(
+                        i).getData()
                         .toString()
                 );
                 context.getReplicatedLog()
@@ -164,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -188,22 +231,26 @@ public class Follower extends AbstractRaftActorBehavior {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
             }
         }
 
         if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
+
         } else if (message instanceof InstallSnapshot) {
-            InstallSnapshot snapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(snapshot), actor());
+            InstallSnapshot installSnapshot = (InstallSnapshot) message;
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -211,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }