X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=288ce32a64ec21286adda9cf3672f7b53d1bf506;hp=4fa4dbfb57b34f620eecd6b82710dffc754cd88f;hb=d3d04140ac3d7a2a90a3b953cc4ea27fca2bfc32;hpb=b4bf55727093657662d8c16a50fa85f87978a586 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 4fa4dbfb57..288ce32a64 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -14,22 +14,24 @@ import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; import akka.cluster.MemberStatus; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -303,7 +305,7 @@ public class Follower extends AbstractRaftActorBehavior { // applied to the state already, as the persistence callback occurs async, and we want those entries // purged from the persisted log as well. final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false); - final Procedure appendAndPersistCallback = logEntry -> { + final Consumer appendAndPersistCallback = logEntry -> { final List entries = appendEntries.getEntries(); final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1); if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) { @@ -408,8 +410,8 @@ public class Follower extends AbstractRaftActorBehavior { return false; } - private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot, - short leaderRaftVersion) { + private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, final boolean forceInstallSnapshot, + final short leaderRaftVersion) { // We found that the log was out of sync so just send a negative reply. final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(), @@ -435,6 +437,11 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + return new ApplyState(null, null, entry); + } + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { @@ -454,7 +461,7 @@ public class Follower extends AbstractRaftActorBehavior { // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) { log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); @@ -500,6 +507,10 @@ public class Follower extends AbstractRaftActorBehavior { if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) { log.debug("{}: Received ElectionTimeout but leader appears to be available", logName()); scheduleElection(electionDuration()); + } else if (isThisFollowerIsolated()) { + log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName()); + setLeaderId(null); + scheduleElection(electionDuration()); } else { log.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); @@ -569,6 +580,40 @@ public class Follower extends AbstractRaftActorBehavior { return false; } + private boolean isThisFollowerIsolated() { + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return false; + } + + final Cluster cluster = maybeCluster.get(); + final Member selfMember = cluster.selfMember(); + + final CurrentClusterState state = cluster.state(); + final Set unreachable = state.getUnreachable(); + final Iterable members = state.getMembers(); + + log.debug("{}: Checking if this node is isolated in the cluster unreachable set {}," + + "all members {} self member: {}", logName(), unreachable, members, selfMember); + + // no unreachable peers means we cannot be isolated + if (unreachable.size() == 0) { + return false; + } + + final Set membersToCheck = new HashSet<>(); + members.forEach(membersToCheck::add); + + membersToCheck.removeAll(unreachable); + + // check if the only member not unreachable is us + if (membersToCheck.size() == 1 && membersToCheck.iterator().next().equals(selfMember)) { + return true; + } + + return false; + } + private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) { log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); @@ -600,7 +645,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - installSnapshot.getServerConfig().orNull()); + installSnapshot.getServerConfig().orElse(null)); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override