Introduce a mechanism for a Follower to signal it's sync up status
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 31b5efbe3878df73f74d46d90d018751dca67bbf..618865cb88eb8877cdcfdcfb29208c80707c2c0f 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -18,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -37,12 +37,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  */
 public class Follower extends AbstractRaftActorBehavior {
 
+
+
     private SnapshotTracker snapshotTracker = null;
 
+    private final InitialSyncStatusTracker initialSyncStatusTracker;
+
     public Follower(RaftActorContext context) {
-        super(context);
+        super(context, RaftState.Follower);
 
         scheduleElection(electionDuration());
+
+        initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
     }
 
     private boolean isLogEntryPresent(long index){
@@ -72,13 +78,18 @@ public class Follower extends AbstractRaftActorBehavior {
         return -1;
     }
 
+    private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
+        initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
+    }
+
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
                                                               AppendEntries appendEntries) {
 
-        if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
-            }
+        int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
+            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
         // TODO : Refactor this method into a bunch of smaller methods
@@ -97,48 +108,36 @@ public class Follower extends AbstractRaftActorBehavior {
         long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
         boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
 
+        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
 
         boolean outOfSync = true;
 
         // First check if the logs are in sync or not
-        if (lastIndex() == -1
-                && appendEntries.getPrevLogIndex() != -1) {
+        long lastIndex = lastIndex();
+        if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) {
 
             // The follower's log is out of sync because the leader does have
             // an entry at prevLogIndex and this follower has no entries in
             // it's log.
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
-                        context.getId(), appendEntries.getPrevLogIndex());
-            }
-
-        } else if (lastIndex() > -1
-                && appendEntries.getPrevLogIndex() != -1
-                && !prevEntryPresent) {
+            LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
-                        context.getId(), appendEntries.getPrevLogIndex());
-            }
-
-        } else if (lastIndex() > -1
-                && prevEntryPresent
-                && prevLogTerm != appendEntries.getPrevLogTerm()) {
+            LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                        , context.getId(), prevLogTerm
-                        , appendEntries.getPrevLogTerm());
-            }
+            LOG.debug(
+                "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}",
+                 logName(), prevLogTerm, appendEntries.getPrevLogTerm());
         } else {
             outOfSync = false;
         }
@@ -146,25 +145,19 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Follower ({}) is out-of-sync, " +
-                        "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                        context.getId(), context.getId(), lastIndex(), lastTerm()
-                );
-            }
-            sender.tell(
-                new AppendEntriesReply(context.getId(), currentTerm(), false,
-                    lastIndex(), lastTerm()), actor()
-            );
+
+            LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
+                        logName(), lastIndex, lastTerm());
+
+            sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                    lastTerm()), actor());
             return this;
         }
 
-        if (appendEntries.getEntries() != null
-            && appendEntries.getEntries().size() > 0) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+        if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+
+            LOG.debug("{}: Number of entries to be appended = {}", logName(),
                         appendEntries.getEntries().size());
-            }
 
             // 3. If an existing entry conflicts with a new one (same index
             // but different terms), delete the existing entry and all that
@@ -182,77 +175,76 @@ public class Follower extends AbstractRaftActorBehavior {
                         break;
                     }
 
-                    if (newEntry.getTerm() == matchEntry
-                        .getTerm()) {
+                    if (newEntry.getTerm() == matchEntry.getTerm()) {
                         continue;
                     }
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+                    LOG.debug("{}: Removing entries from log starting at {}", logName(),
                                 matchEntry.getIndex());
-                    }
 
                     // Entries do not match so remove all subsequent entries
-                    context.getReplicatedLog()
-                        .removeFromAndPersist(matchEntry.getIndex());
+                    context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
                     break;
                 }
             }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
-                        (addEntriesFrom + lastIndex()));
-            }
+            lastIndex = lastIndex();
+            LOG.debug("{}: After cleanup entries to be added from = {}", logName(),
+                        (addEntriesFrom + lastIndex));
 
             // 4. Append any new entries not already in the log
-            for (int i = addEntriesFrom;
-                 i < appendEntries.getEntries().size(); i++) {
+            for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
+                ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: Append entry to log {}", context.getId(),
-                            appendEntries.getEntries().get(i).getData());
-                }
-                context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
-            }
+                LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
+                context.getReplicatedLog().appendAndPersist(entry);
             }
-        }
 
+            LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+        }
 
         // 5. If leaderCommit > commitIndex, set commitIndex =
         // min(leaderCommit, index of last new entry)
 
+        lastIndex = lastIndex();
         long prevCommitIndex = context.getCommitIndex();
 
-        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
-            context.getReplicatedLog().lastIndex()));
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
 
         if (prevCommitIndex != context.getCommitIndex()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
-            }
+            LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
         }
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
         // check if there are any entries to be applied. last-applied can be equal to last-index
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
-            context.getLastApplied() < lastIndex()) {
+            context.getLastApplied() < lastIndex) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: applyLogToStateMachine, " +
-                        "appendEntries.getLeaderCommit():{}," +
-                        "context.getLastApplied():{}, lastIndex():{}", context.getId(),
-                    appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
-                );
+                        "appendEntries.getLeaderCommit(): {}," +
+                        "context.getLastApplied(): {}, lastIndex(): {}", logName(),
+                    appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex);
             }
 
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
-        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
-            lastIndex(), lastTerm()), actor());
+        AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+            lastIndex, lastTerm());
+
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
+            LOG.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+        }
+
+        sender.tell(reply, actor());
+
+        if (!context.isSnapshotCaptureInitiated()) {
+            super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
+        }
 
         return this;
     }
@@ -267,10 +259,6 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
-    @Override public RaftState state() {
-        return RaftState.Follower;
-    }
-
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
@@ -281,11 +269,15 @@ public class Follower extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
             }
         }
 
         if (message instanceof ElectionTimeout) {
+            LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
             return switchBehavior(new Candidate(context));
 
         } else if (message instanceof InstallSnapshot) {
@@ -300,17 +292,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: InstallSnapshot received by follower " +
-                    "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
-                installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
-            );
-        }
+        LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
+                    logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
+                    installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
 
         if(snapshotTracker == null){
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
         }
 
+        updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
+
         try {
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
@@ -327,18 +318,23 @@ public class Follower extends AbstractRaftActorBehavior {
 
             }
 
-            sender.tell(new InstallSnapshotReply(
-                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                    true), actor());
+            InstallSnapshotReply reply = new InstallSnapshotReply(
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
+            LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+
+            sender.tell(reply, actor());
 
         } catch (SnapshotTracker.InvalidChunkException e) {
+            LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     -1, false), actor());
             snapshotTracker = null;
 
         } catch (Exception e){
-            LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
+            LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e);
+
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     installSnapshot.getChunkIndex(), false), actor());
@@ -346,14 +342,45 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stopElection();
     }
 
     @VisibleForTesting
-    ByteString getSnapshotChunksCollected(){
-        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    SnapshotTracker getSnapshotTracker(){
+        return snapshotTracker;
     }
 
+    private static class InitialSyncStatusTracker {
+
+        private static final long INVALID_LOG_INDEX = -2L;
+        private long initialLeaderCommit = INVALID_LOG_INDEX;
+        private boolean initialSyncUpDone = false;
+        private String syncedLeaderId = null;
+        private final ActorRef actor;
+
+        public InitialSyncStatusTracker(ActorRef actor) {
+            this.actor = actor;
+        }
+
+        public void update(String leaderId, long leaderCommit, long commitIndex){
 
+            if(!leaderId.equals(syncedLeaderId)){
+                initialSyncUpDone = false;
+                initialLeaderCommit = INVALID_LOG_INDEX;
+                syncedLeaderId = leaderId;
+            }
+
+            if(!initialSyncUpDone){
+                if(initialLeaderCommit == INVALID_LOG_INDEX){
+                    actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+                    initialLeaderCommit = leaderCommit;
+                } else if(commitIndex >= initialLeaderCommit){
+                    actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+                    initialSyncUpDone = true;
+                }
+            }
+        }
+    }
 }