Do not bump follower term while it is isolated
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 11fa23748e776c11372b74e8e885117db0f06fd1..288ce32a64ec21286adda9cf3672f7b53d1bf506 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
@@ -15,22 +14,24 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -94,7 +95,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    protected final void setLeaderId(@Nullable final String leaderId) {
+    protected final void setLeaderId(final @Nullable String leaderId) {
         this.leaderId = leaderId;
     }
 
@@ -132,7 +133,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     @Override
     protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
-
         int numLogEntries = appendEntries.getEntries().size();
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
@@ -140,10 +140,6 @@ public class Follower extends AbstractRaftActorBehavior {
             log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
-        // TODO : Refactor this method into a bunch of smaller methods
-        // to make it easier to read. Before refactoring ensure tests
-        // cover the code properly
-
         if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
             log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
                 + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
@@ -153,7 +149,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm(), context.getPayloadVersion());
+                    lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                    appendEntries.getLeaderRaftVersion());
 
             log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
             sender.tell(reply, actor());
@@ -165,124 +162,29 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = appendEntries.getLeaderId();
         leaderPayloadVersion = appendEntries.getPayloadVersion();
 
-        // First check if the logs are in sync or not
-        long lastIndex = lastIndex();
-
-        if (isOutOfSync(appendEntries)) {
-            // We found that the log was out of sync so just send a negative
-            // reply and return
+        if (appendEntries.getLeaderAddress().isPresent()) {
+            final String address = appendEntries.getLeaderAddress().get();
+            log.debug("New leader address: {}", address);
 
-            final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                    lastTerm(), context.getPayloadVersion());
+            context.setPeerAddress(leaderId, address);
+            context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+        }
 
-            log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+        // First check if the logs are in sync or not
+        if (isOutOfSync(appendEntries, sender)) {
             updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
-            sender.tell(reply, actor());
             return this;
         }
 
-        if (numLogEntries > 0) {
-            log.debug("{}: Number of entries to be appended = {}", logName(), numLogEntries);
-
-            // 3. If an existing entry conflicts with a new one (same index
-            // but different terms), delete the existing entry and all that
-            // follow it (§5.3)
-            int addEntriesFrom = 0;
-            if (context.getReplicatedLog().size() > 0) {
-
-                // Find the entry up until the one that is not in the follower's log
-                for (int i = 0;i < numLogEntries; i++, addEntriesFrom++) {
-                    ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
-
-                    if (!isLogEntryPresent(matchEntry.getIndex())) {
-                        // newEntry not found in the log
-                        break;
-                    }
-
-                    long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex());
-
-                    log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
-                            existingEntryTerm);
-
-                    // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
-                    // what the term was so we'll assume it matches.
-                    if (existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
-                        continue;
-                    }
-
-                    if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
-
-                        log.info("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex());
-
-                        // Entries do not match so remove all subsequent entries
-                        if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
-                            // Could not remove the entries - this means the matchEntry index must be in the
-                            // snapshot and not the log. In this case the prior entries are part of the state
-                            // so we must send back a reply to force a snapshot to completely re-sync the
-                            // follower's log and state.
-
-                            log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
-                            updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
-                            sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                                    lastTerm(), context.getPayloadVersion(), true), actor());
-                            return this;
-                        }
-
-                        break;
-                    } else {
-                        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
-                        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                                lastTerm(), context.getPayloadVersion(), true), actor());
-                        return this;
-                    }
-                }
-            }
-
-            lastIndex = lastIndex();
-            log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
-                    lastIndex, addEntriesFrom);
-
-            // When persistence successfully completes for each new log entry appended, we need to determine if we
-            // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
-            // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
-            // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
-            // This is done because subsequent log entries after the one that tripped the threshold may have been
-            // applied to the state already, as the persistence callback occurs async, and we want those entries
-            // purged from the persisted log as well.
-            final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
-            final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
-                final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
-                final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
-                if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
-                    context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
-                }
-            };
-
-            // 4. Append any new entries not already in the log
-            for (int i = addEntriesFrom; i < numLogEntries; i++) {
-                ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
-
-                log.debug("{}: Append entry to log {}", logName(), entry.getData());
-
-                context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
-
-                shouldCaptureSnapshot.compareAndSet(false,
-                        context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
-
-                if (entry.getData() instanceof ServerConfigurationPayload) {
-                    context.updatePeerIds((ServerConfigurationPayload)entry.getData());
-                }
-            }
-
-            log.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+        if (!processNewEntries(appendEntries, sender)) {
+            updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
+            return this;
         }
 
-        // 5. If leaderCommit > commitIndex, set commitIndex =
-        // min(leaderCommit, index of last new entry)
-
-        lastIndex = lastIndex();
+        long lastIndex = lastIndex();
         long prevCommitIndex = context.getCommitIndex();
 
+        // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
         if (appendEntries.getLeaderCommit() > prevCommitIndex) {
             context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
         }
@@ -292,7 +194,8 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                lastIndex, lastTerm(), context.getPayloadVersion());
+                lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                appendEntries.getLeaderRaftVersion());
 
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
@@ -301,14 +204,13 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         // Reply to the leader before applying any previous state so as not to hold up leader consensus.
-        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
         sender.tell(reply, actor());
 
-        // If commitIndex > lastApplied: increment lastApplied, apply
-        // log[lastApplied] to state machine (§5.3)
-        // check if there are any entries to be applied. last-applied can be equal to last-index
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()
-                && context.getLastApplied() < lastIndex) {
+        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
+
+        // If leaderCommit > lastApplied, increment lastApplied and apply log[lastApplied] to state machine (§5.3).
+        // lastApplied can be equal to lastIndex.
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() && context.getLastApplied() < lastIndex) {
             if (log.isDebugEnabled()) {
                 log.debug("{}: applyLogToStateMachine, appendEntries.getLeaderCommit(): {}, "
                         + "context.getLastApplied(): {}, lastIndex(): {}", logName(),
@@ -327,7 +229,112 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
-    private boolean isOutOfSync(final AppendEntries appendEntries) {
+    private boolean processNewEntries(final AppendEntries appendEntries, final ActorRef sender) {
+        int numLogEntries = appendEntries.getEntries().size();
+        if (numLogEntries == 0) {
+            return true;
+        }
+
+        log.debug("{}: Number of entries to be appended = {}", logName(), numLogEntries);
+
+        long lastIndex = lastIndex();
+        int addEntriesFrom = 0;
+
+        // First check for conflicting entries. If an existing entry conflicts with a new one (same index but different
+        // term), delete the existing entry and all that follow it (§5.3)
+        if (context.getReplicatedLog().size() > 0) {
+            // Find the entry up until the one that is not in the follower's log
+            for (int i = 0;i < numLogEntries; i++, addEntriesFrom++) {
+                ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+
+                if (!isLogEntryPresent(matchEntry.getIndex())) {
+                    // newEntry not found in the log
+                    break;
+                }
+
+                long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex());
+
+                log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
+                        existingEntryTerm);
+
+                // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
+                // what the term was so we'll assume it matches.
+                if (existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
+                    continue;
+                }
+
+                if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+                    log.info("{}: Removing entries from log starting at {}, commitIndex: {}, lastApplied: {}",
+                            logName(), matchEntry.getIndex(), context.getCommitIndex(), context.getLastApplied());
+
+                    // Entries do not match so remove all subsequent entries but only if the existing entries haven't
+                    // been applied to the state yet.
+                    if (matchEntry.getIndex() <= context.getLastApplied()
+                            || !context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
+                        // Could not remove the entries - this means the matchEntry index must be in the
+                        // snapshot and not the log. In this case the prior entries are part of the state
+                        // so we must send back a reply to force a snapshot to completely re-sync the
+                        // follower's log and state.
+
+                        log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
+                        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                                lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                                appendEntries.getLeaderRaftVersion()), actor());
+                        return false;
+                    }
+
+                    break;
+                } else {
+                    sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                            lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                            appendEntries.getLeaderRaftVersion()), actor());
+                    return false;
+                }
+            }
+        }
+
+        lastIndex = lastIndex();
+        log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), lastIndex,
+                addEntriesFrom);
+
+        // When persistence successfully completes for each new log entry appended, we need to determine if we
+        // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+        // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+        // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+        // This is done because subsequent log entries after the one that tripped the threshold may have been
+        // applied to the state already, as the persistence callback occurs async, and we want those entries
+        // purged from the persisted log as well.
+        final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+        final Consumer<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+            final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
+            final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
+            if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+                context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+            }
+        };
+
+        // Append any new entries not already in the log
+        for (int i = addEntriesFrom; i < numLogEntries; i++) {
+            ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
+
+            log.debug("{}: Append entry to log {}", logName(), entry.getData());
+
+            context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
+
+            shouldCaptureSnapshot.compareAndSet(false,
+                    context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
+            if (entry.getData() instanceof ServerConfigurationPayload) {
+                context.updatePeerIds((ServerConfigurationPayload)entry.getData());
+            }
+        }
+
+        log.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+
+        return true;
+    }
+
+    private boolean isOutOfSync(final AppendEntries appendEntries, final ActorRef sender) {
 
         final long lastIndex = lastIndex();
         if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) {
@@ -337,21 +344,27 @@ public class Follower extends AbstractRaftActorBehavior {
 
             log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
                 appendEntries.getPrevLogIndex());
+
+            sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
             return true;
         }
 
         if (lastIndex > -1) {
             if (isLogEntryPresent(appendEntries.getPrevLogIndex())) {
-                final long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
-                if (prevLogTerm != appendEntries.getPrevLogTerm()) {
+                final long leadersPrevLogTermInFollowersLogOrSnapshot =
+                        getLogEntryOrSnapshotTerm(appendEntries.getPrevLogIndex());
+                if (leadersPrevLogTermInFollowersLogOrSnapshot != appendEntries.getPrevLogTerm()) {
 
                     // The follower's log is out of sync because the Leader's prevLogIndex entry does exist
-                    // in the follower's log but it has a different term in it
+                    // in the follower's log or snapshot but it has a different term.
 
                     log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append "
-                            + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(),
-                            appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
-                            context.getReplicatedLog().getSnapshotIndex());
+                        + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot,
+                        appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
+
+                    sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                     return true;
                 }
             } else if (appendEntries.getPrevLogIndex() != -1) {
@@ -359,8 +372,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 // The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log
 
                 log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, "
-                        + "snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
-                        context.getReplicatedLog().getSnapshotIndex());
+                        + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
+
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -372,14 +387,22 @@ public class Follower extends AbstractRaftActorBehavior {
                 // the previous entry in it's in-memory journal
 
                 log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the "
-                        + "in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getReplicatedToAllIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
+
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
 
             final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
             if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).getIndex() - 1)) {
                 log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
-                        + "in-memory journal", logName(), entries.get(0).getIndex() - 1);
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
+
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -387,6 +410,21 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
+    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, final boolean forceInstallSnapshot,
+            final short leaderRaftVersion) {
+        // We found that the log was out of sync so just send a negative reply.
+        final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
+                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+                leaderRaftVersion);
+
+        log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+        sender.tell(reply, actor());
+    }
+
+    private boolean needsLeaderAddress() {
+        return context.getPeerAddress(leaderId) == null;
+    }
+
     @Override
     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
         final AppendEntriesReply appendEntriesReply) {
@@ -399,6 +437,11 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
@@ -418,7 +461,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // If RPC request or response contains term T > currentTerm:
         // set currentTerm = T, convert to follower (§5.1)
         // This applies to all RPC messages and responses
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
             log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
                 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
@@ -464,6 +507,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
                     log.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
                     scheduleElection(electionDuration());
+                } else if (isThisFollowerIsolated()) {
+                    log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName());
+                    setLeaderId(null);
+                    scheduleElection(electionDuration());
                 } else {
                     log.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
                     return internalSwitchBehavior(RaftState.Candidate);
@@ -533,6 +580,40 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
+    private boolean isThisFollowerIsolated() {
+        final Optional<Cluster> maybeCluster = context.getCluster();
+        if (!maybeCluster.isPresent()) {
+            return false;
+        }
+
+        final Cluster cluster = maybeCluster.get();
+        final Member selfMember = cluster.selfMember();
+
+        final CurrentClusterState state = cluster.state();
+        final Set<Member> unreachable = state.getUnreachable();
+        final Iterable<Member> members = state.getMembers();
+
+        log.debug("{}: Checking if this node is isolated in the cluster unreachable set {},"
+                        + "all members {} self member: {}", logName(), unreachable, members, selfMember);
+
+        // no unreachable peers means we cannot be isolated
+        if (unreachable.size() == 0) {
+            return false;
+        }
+
+        final Set<Member> membersToCheck = new HashSet<>();
+        members.forEach(membersToCheck::add);
+
+        membersToCheck.removeAll(unreachable);
+
+        // check if the only member not unreachable is us
+        if (membersToCheck.size() == 1 && membersToCheck.iterator().next().equals(selfMember)) {
+            return true;
+        }
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) {
 
         log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
@@ -564,7 +645,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedTerm(),
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor(),
-                        installSnapshot.getServerConfig().orNull());
+                        installSnapshot.getServerConfig().orElse(null));
 
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override