Alleviate premature elections in followers
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index e4d42661de3a9612d558e29685c55bf195d9a331..efece88e28e406a2b587e51a76c424e2a1219f32 100644 (file)
@@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -20,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -50,6 +53,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     };
 
+    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
     private short leaderPayloadVersion;
@@ -66,7 +70,7 @@ public class Follower extends AbstractRaftActorBehavior {
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
-            actor().tell(ElectionTimeout.INSTANCE, actor());
+            actor().tell(TimeoutNow.INSTANCE, actor());
         } else {
             scheduleElection(electionDuration());
         }
@@ -92,6 +96,14 @@ public class Follower extends AbstractRaftActorBehavior {
         this.leaderPayloadVersion = leaderPayloadVersion;
     }
 
+    private void restartLastLeaderMessageTimer() {
+        if (lastLeaderMessageTimer.isRunning()) {
+            lastLeaderMessageTimer.reset();
+        }
+
+        lastLeaderMessageTimer.start();
+    }
+
     private boolean isLogEntryPresent(long index){
         if(context.getReplicatedLog().isInSnapshot(index)) {
             return true;
@@ -343,15 +355,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
-        if (originalMessage instanceof ElectionTimeout) {
-            if (canStartElection()) {
-                LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
-                return internalSwitchBehavior(RaftState.Candidate);
-            } else {
-                setLeaderId(null);
-                scheduleElection(electionDuration());
-                return this;
-            }
+        if (originalMessage instanceof ElectionTimeout || originalMessage instanceof TimeoutNow) {
+            return handleElectionTimeout(originalMessage);
         }
 
         final Object message = fromSerializableMessage(originalMessage);
@@ -372,19 +377,50 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         if (rpc instanceof InstallSnapshot) {
-            InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
-            handleInstallSnapshot(sender, installSnapshot);
+            handleInstallSnapshot(sender, (InstallSnapshot) rpc);
+            restartLastLeaderMessageTimer();
             scheduleElection(electionDuration());
             return this;
         }
 
         if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
+            restartLastLeaderMessageTimer();
             scheduleElection(electionDuration());
         }
 
         return super.handleMessage(sender, rpc);
     }
 
+    private RaftActorBehavior handleElectionTimeout(Object message) {
+        // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader
+        // during the election timeout interval. It may that the election timer expired b/c this actor
+        // was busy and messages got delayed, in which case leader messages would be backed up in the
+        // queue but would be processed before the ElectionTimeout message and thus would restart the
+        // lastLeaderMessageTimer.
+        long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
+                context.getConfigParams().getElectionTimeOutInterval().toMillis();
+
+        if(canStartElection()) {
+            if(message instanceof TimeoutNow || noLeaderMessageReceived) {
+                LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+                return internalSwitchBehavior(RaftState.Candidate);
+            } else {
+                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
+                        logName(), lastLeaderMessageInterval);
+                scheduleElection(electionDuration());
+            }
+        } else if(message instanceof ElectionTimeout) {
+            if(noLeaderMessageReceived) {
+                setLeaderId(null);
+            }
+
+            scheduleElection(electionDuration());
+        }
+
+        return this;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);