BUG 2185: Expand the scope of sync status to cover a slow follower
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index c1d261c56192cb0b1bbe6905b77faa7d1d8fad0c..c8535614a9462bb5d4ee6cdfb6c3e3428c1cb854 100644 (file)
@@ -17,12 +17,12 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
@@ -37,16 +37,16 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  */
 public class Follower extends AbstractRaftActorBehavior {
 
-
-
     private SnapshotTracker snapshotTracker = null;
 
-    private final InitialSyncStatusTracker initialSyncStatusTracker;
+    private final SyncStatusTracker initialSyncStatusTracker;
+
+    private static final int SYNC_THRESHOLD = 10;
 
     public Follower(RaftActorContext context) {
         super(context, RaftState.Follower);
 
-        initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+        initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(context.getRaftPolicy().automaticElectionsEnabled()) {
             if (context.getPeerAddresses().isEmpty()) {
@@ -148,7 +148,7 @@ public class Follower extends AbstractRaftActorBehavior {
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
 
-                // Find the entry up until which the one that is not in the follower's log
+                // Find the entry up until the one that is not in the follower's log
                 for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
                     ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
                     ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
@@ -162,12 +162,19 @@ public class Follower extends AbstractRaftActorBehavior {
                         continue;
                     }
 
-                    LOG.debug("{}: Removing entries from log starting at {}", logName(),
+                    if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+
+                        LOG.debug("{}: Removing entries from log starting at {}", logName(),
                                 matchEntry.getIndex());
 
-                    // Entries do not match so remove all subsequent entries
-                    context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
-                    break;
+                        // Entries do not match so remove all subsequent entries
+                        context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
+                        break;
+                    } else {
+                        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                                lastTerm(), context.getPayloadVersion(), true), actor());
+                        return this;
+                    }
                 }
             }
 
@@ -314,14 +321,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (message instanceof ElectionTimeout) {
             LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
-            return switchBehavior(new Candidate(context));
+            return internalSwitchBehavior(RaftState.Candidate);
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
             handleInstallSnapshot(sender, installSnapshot);
         }
 
-        scheduleElection(electionDuration());
+        if(message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+            scheduleElection(electionDuration());
+        }
 
         return super.handleMessage(sender, message);
     }
@@ -387,36 +396,4 @@ public class Follower extends AbstractRaftActorBehavior {
     SnapshotTracker getSnapshotTracker(){
         return snapshotTracker;
     }
-
-    private class InitialSyncStatusTracker {
-
-        private static final long INVALID_LOG_INDEX = -2L;
-        private long initialLeaderCommit = INVALID_LOG_INDEX;
-        private boolean initialSyncUpDone = false;
-        private String syncedLeaderId = null;
-        private final ActorRef actor;
-
-        public InitialSyncStatusTracker(ActorRef actor) {
-            this.actor = actor;
-        }
-
-        public void update(String leaderId, long leaderCommit, long commitIndex){
-
-            if(!leaderId.equals(syncedLeaderId)){
-                initialSyncUpDone = false;
-                initialLeaderCommit = INVALID_LOG_INDEX;
-                syncedLeaderId = leaderId;
-            }
-
-            if(!initialSyncUpDone){
-                if(initialLeaderCommit == INVALID_LOG_INDEX){
-                    actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
-                    initialLeaderCommit = leaderCommit;
-                } else if(commitIndex >= initialLeaderCommit){
-                    actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
-                    initialSyncUpDone = true;
-                }
-            }
-        }
-    }
 }