Bug 5450: Query akka cluster state on Follower ElectionTimeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 5f37af6e9d828482d16aac95178fc47418f27d6d..73e2cf9bc576d7831f9d12ba42cd849e7c1dea7a 100644 (file)
@@ -9,10 +9,18 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -44,6 +52,8 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
 public class Follower extends AbstractRaftActorBehavior {
     private static final int SYNC_THRESHOLD = 10;
 
+    private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
+
     private final SyncStatusTracker initialSyncStatusTracker;
 
     private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
@@ -53,7 +63,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     };
 
-    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
+    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
     private short leaderPayloadVersion;
@@ -397,16 +407,30 @@ public class Follower extends AbstractRaftActorBehavior {
         // queue but would be processed before the ElectionTimeout message and thus would restart the
         // lastLeaderMessageTimer.
         long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
-        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
-                context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
+                lastLeaderMessageInterval >= electionTimeoutInMillis;
 
         if(canStartElection()) {
-            if(message instanceof TimeoutNow || noLeaderMessageReceived) {
-                LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+            if(message instanceof TimeoutNow) {
+                LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
                 return internalSwitchBehavior(RaftState.Candidate);
+            } else if(noLeaderMessageReceived) {
+                // Check the cluster state to see if the leader is known to be up before we go to Candidate.
+                // However if we haven't heard from the leader in a long time even though the cluster state
+                // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
+                // to Candidate,
+                long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
+                if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+                    LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+                    scheduleElection(electionDuration());
+                } else {
+                    LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+                    return internalSwitchBehavior(RaftState.Candidate);
+                }
             } else {
-                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
-                        logName(), lastLeaderMessageInterval);
+                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+                        logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
                 scheduleElection(electionDuration());
             }
         } else if(message instanceof ElectionTimeout) {
@@ -420,6 +444,55 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean isLeaderAvailabilityKnown() {
+        if(leaderId == null) {
+            return false;
+        }
+
+        Optional<Cluster> cluster = context.getCluster();
+        if(!cluster.isPresent()) {
+            return false;
+        }
+
+        ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
+        if(leaderActor == null) {
+            return false;
+        }
+
+        Address leaderAddress = leaderActor.anchorPath().address();
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+                unreachable);
+
+        for(Member m: unreachable) {
+            if(leaderAddress.equals(m.address())) {
+                LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+                return false;
+            }
+        }
+
+        for(Member m: state.getMembers()) {
+            if(leaderAddress.equals(m.address())) {
+                if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+                            leaderAddress, m.status());
+                    return true;
+                } else {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+                            leaderAddress, m.status());
+                    return false;
+                }
+            }
+        }
+
+        LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);