Introduce a mechanism for a Follower to signal it's sync up status
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index c799441d603597ea25d530db8dc6eddc9ced68b6..618865cb88eb8877cdcfdcfb29208c80707c2c0f 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -36,12 +37,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  */
 public class Follower extends AbstractRaftActorBehavior {
 
+
+
     private SnapshotTracker snapshotTracker = null;
 
+    private final InitialSyncStatusTracker initialSyncStatusTracker;
+
     public Follower(RaftActorContext context) {
         super(context, RaftState.Follower);
 
         scheduleElection(electionDuration());
+
+        initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
     }
 
     private boolean isLogEntryPresent(long index){
@@ -71,6 +78,10 @@ public class Follower extends AbstractRaftActorBehavior {
         return -1;
     }
 
+    private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
+        initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
+    }
+
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
                                                               AppendEntries appendEntries) {
 
@@ -97,6 +108,7 @@ public class Follower extends AbstractRaftActorBehavior {
         long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
         boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
 
+        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
 
         boolean outOfSync = true;
 
@@ -280,7 +292,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
-
         LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
                     installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
@@ -289,6 +300,8 @@ public class Follower extends AbstractRaftActorBehavior {
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
         }
 
+        updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
+
         try {
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
@@ -338,4 +351,36 @@ public class Follower extends AbstractRaftActorBehavior {
     SnapshotTracker getSnapshotTracker(){
         return snapshotTracker;
     }
+
+    private static class InitialSyncStatusTracker {
+
+        private static final long INVALID_LOG_INDEX = -2L;
+        private long initialLeaderCommit = INVALID_LOG_INDEX;
+        private boolean initialSyncUpDone = false;
+        private String syncedLeaderId = null;
+        private final ActorRef actor;
+
+        public InitialSyncStatusTracker(ActorRef actor) {
+            this.actor = actor;
+        }
+
+        public void update(String leaderId, long leaderCommit, long commitIndex){
+
+            if(!leaderId.equals(syncedLeaderId)){
+                initialSyncUpDone = false;
+                initialLeaderCommit = INVALID_LOG_INDEX;
+                syncedLeaderId = leaderId;
+            }
+
+            if(!initialSyncUpDone){
+                if(initialLeaderCommit == INVALID_LOG_INDEX){
+                    actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+                    initialLeaderCommit = leaderCommit;
+                } else if(commitIndex >= initialLeaderCommit){
+                    actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+                    initialSyncUpDone = true;
+                }
+            }
+        }
+    }
 }