Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index d88c30db333bcf040b20febc76884c9bc13e9147..f115db1c79709d46a1735d0e4e46f4f832122c75 100644 (file)
@@ -16,8 +16,10 @@ import akka.cluster.Member;
 import akka.cluster.MemberStatus;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -30,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -51,6 +54,7 @@ import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
  * convert to candidate
  * </ul>
  */
+// Non-final for testing
 public class Follower extends AbstractRaftActorBehavior {
     private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
 
@@ -67,11 +71,13 @@ public class Follower extends AbstractRaftActorBehavior {
         this(context, null, (short)-1);
     }
 
+    @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
+        justification = "electionDuration() is not final for Candidate override")
     public Follower(final RaftActorContext context, final String initialLeaderId,
             final short initialLeaderPayloadVersion) {
         super(context, RaftState.Follower);
-        this.leaderId = initialLeaderId;
-        this.leaderPayloadVersion = initialLeaderPayloadVersion;
+        leaderId = initialLeaderId;
+        leaderPayloadVersion = initialLeaderPayloadVersion;
 
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
             .getSyncIndexThreshold());
@@ -435,6 +441,11 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
@@ -500,6 +511,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
                     log.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
                     scheduleElection(electionDuration());
+                } else if (isThisFollowerIsolated()) {
+                    log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName());
+                    setLeaderId(null);
+                    scheduleElection(electionDuration());
                 } else {
                     log.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
                     return internalSwitchBehavior(RaftState.Candidate);
@@ -569,6 +584,40 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
+    private boolean isThisFollowerIsolated() {
+        final Optional<Cluster> maybeCluster = context.getCluster();
+        if (!maybeCluster.isPresent()) {
+            return false;
+        }
+
+        final Cluster cluster = maybeCluster.get();
+        final Member selfMember = cluster.selfMember();
+
+        final CurrentClusterState state = cluster.state();
+        final Set<Member> unreachable = state.getUnreachable();
+        final Iterable<Member> members = state.getMembers();
+
+        log.debug("{}: Checking if this node is isolated in the cluster unreachable set {},"
+                        + "all members {} self member: {}", logName(), unreachable, members, selfMember);
+
+        // no unreachable peers means we cannot be isolated
+        if (unreachable.size() == 0) {
+            return false;
+        }
+
+        final Set<Member> membersToCheck = new HashSet<>();
+        members.forEach(membersToCheck::add);
+
+        membersToCheck.removeAll(unreachable);
+
+        // check if the only member not unreachable is us
+        if (membersToCheck.size() == 1 && membersToCheck.iterator().next().equals(selfMember)) {
+            return true;
+        }
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) {
 
         log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);