Bug 5450: Query akka cluster state on Follower ElectionTimeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index efece88e28e406a2b587e51a76c424e2a1219f32..73e2cf9bc576d7831f9d12ba42cd849e7c1dea7a 100644 (file)
@@ -9,16 +9,23 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -30,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 
 /**
  * The behavior of a RaftActor in the Follower state
@@ -44,6 +52,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 public class Follower extends AbstractRaftActorBehavior {
     private static final int SYNC_THRESHOLD = 10;
 
+    private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
+
     private final SyncStatusTracker initialSyncStatusTracker;
 
     private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
@@ -53,7 +63,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     };
 
-    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
+    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
     private short leaderPayloadVersion;
@@ -354,12 +364,11 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
-        if (originalMessage instanceof ElectionTimeout || originalMessage instanceof TimeoutNow) {
-            return handleElectionTimeout(originalMessage);
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
+            return handleElectionTimeout(message);
         }
 
-        final Object message = fromSerializableMessage(originalMessage);
         if (!(message instanceof RaftRPC)) {
             // The rest of the processing requires the message to be a RaftRPC
             return null;
@@ -398,16 +407,30 @@ public class Follower extends AbstractRaftActorBehavior {
         // queue but would be processed before the ElectionTimeout message and thus would restart the
         // lastLeaderMessageTimer.
         long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
-        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
-                context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
+                lastLeaderMessageInterval >= electionTimeoutInMillis;
 
         if(canStartElection()) {
-            if(message instanceof TimeoutNow || noLeaderMessageReceived) {
-                LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+            if(message instanceof TimeoutNow) {
+                LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
                 return internalSwitchBehavior(RaftState.Candidate);
+            } else if(noLeaderMessageReceived) {
+                // Check the cluster state to see if the leader is known to be up before we go to Candidate.
+                // However if we haven't heard from the leader in a long time even though the cluster state
+                // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
+                // to Candidate,
+                long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
+                if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+                    LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+                    scheduleElection(electionDuration());
+                } else {
+                    LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+                    return internalSwitchBehavior(RaftState.Candidate);
+                }
             } else {
-                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
-                        logName(), lastLeaderMessageInterval);
+                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+                        logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
                 scheduleElection(electionDuration());
             }
         } else if(message instanceof ElectionTimeout) {
@@ -421,6 +444,55 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean isLeaderAvailabilityKnown() {
+        if(leaderId == null) {
+            return false;
+        }
+
+        Optional<Cluster> cluster = context.getCluster();
+        if(!cluster.isPresent()) {
+            return false;
+        }
+
+        ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
+        if(leaderActor == null) {
+            return false;
+        }
+
+        Address leaderAddress = leaderActor.anchorPath().address();
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+                unreachable);
+
+        for(Member m: unreachable) {
+            if(leaderAddress.equals(m.address())) {
+                LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+                return false;
+            }
+        }
+
+        for(Member m: state.getMembers()) {
+            if(leaderAddress.equals(m.address())) {
+                if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+                            leaderAddress, m.status());
+                    return true;
+                } else {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+                            leaderAddress, m.status());
+                    return false;
+                }
+            }
+        }
+
+        LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);