X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=4913d98c36e9090253cf0ea4ece40873a2d655cd;hb=703c260bebba0a8f7efa85edef4ad4d359c59cda;hp=4fa0e161db16ffc2ada57fd52cc6af22ce8160fd;hpb=88e2974b8d391d6e91a6338b0a1b8dbf966a8a71;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 4fa0e161db..4913d98c36 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; @@ -15,22 +14,24 @@ import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; import akka.cluster.MemberStatus; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; +import java.util.function.Consumer; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -52,6 +53,7 @@ import org.opendaylight.controller.cluster.raft.persisted.Snapshot; * convert to candidate * */ +// Non-final for testing public class Follower extends AbstractRaftActorBehavior { private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18; @@ -68,11 +70,13 @@ public class Follower extends AbstractRaftActorBehavior { this(context, null, (short)-1); } + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", + justification = "electionDuration() is not final for Candidate override") public Follower(final RaftActorContext context, final String initialLeaderId, final short initialLeaderPayloadVersion) { super(context, RaftState.Follower); - this.leaderId = initialLeaderId; - this.leaderPayloadVersion = initialLeaderPayloadVersion; + leaderId = initialLeaderId; + leaderPayloadVersion = initialLeaderPayloadVersion; initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams() .getSyncIndexThreshold()); @@ -94,7 +98,7 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderId(@Nullable final String leaderId) { + protected final void setLeaderId(final @Nullable String leaderId) { this.leaderId = leaderId; } @@ -148,7 +152,8 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { // if snapshot install is in progress, follower should just acknowledge append entries with a reply. AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex(), lastTerm(), context.getPayloadVersion()); + lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()); log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); sender.tell(reply, actor()); @@ -160,6 +165,14 @@ public class Follower extends AbstractRaftActorBehavior { leaderId = appendEntries.getLeaderId(); leaderPayloadVersion = appendEntries.getPayloadVersion(); + if (appendEntries.getLeaderAddress().isPresent()) { + final String address = appendEntries.getLeaderAddress().get(); + log.debug("New leader address: {}", address); + + context.setPeerAddress(leaderId, address); + context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address); + } + // First check if the logs are in sync or not if (isOutOfSync(appendEntries, sender)) { updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); @@ -184,7 +197,8 @@ public class Follower extends AbstractRaftActorBehavior { } AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex, lastTerm(), context.getPayloadVersion()); + lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()); if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntries returning : {}", logName(), reply); @@ -267,14 +281,16 @@ public class Follower extends AbstractRaftActorBehavior { log.info("{}: Could not remove entries - sending reply to force snapshot", logName()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm(), context.getPayloadVersion(), true), actor()); + lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()), actor()); return false; } break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm(), context.getPayloadVersion(), true), actor()); + lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()), actor()); return false; } } @@ -292,7 +308,7 @@ public class Follower extends AbstractRaftActorBehavior { // applied to the state already, as the persistence callback occurs async, and we want those entries // purged from the persisted log as well. final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false); - final Procedure appendAndPersistCallback = logEntry -> { + final Consumer appendAndPersistCallback = logEntry -> { final List entries = appendEntries.getEntries(); final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1); if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) { @@ -311,8 +327,8 @@ public class Follower extends AbstractRaftActorBehavior { shouldCaptureSnapshot.compareAndSet(false, context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex())); - if (entry.getData() instanceof ServerConfigurationPayload) { - context.updatePeerIds((ServerConfigurationPayload)entry.getData()); + if (entry.getData() instanceof ServerConfigurationPayload serverConfiguration) { + context.updatePeerIds(serverConfiguration); } } @@ -332,24 +348,26 @@ public class Follower extends AbstractRaftActorBehavior { log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(), appendEntries.getPrevLogIndex()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } if (lastIndex > -1) { if (isLogEntryPresent(appendEntries.getPrevLogIndex())) { - final long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); - if (prevLogTerm != appendEntries.getPrevLogTerm()) { + final long leadersPrevLogTermInFollowersLogOrSnapshot = + getLogEntryOrSnapshotTerm(appendEntries.getPrevLogIndex()); + if (leadersPrevLogTermInFollowersLogOrSnapshot != appendEntries.getPrevLogTerm()) { // The follower's log is out of sync because the Leader's prevLogIndex entry does exist - // in the follower's log but it has a different term in it + // in the follower's log or snapshot but it has a different term. log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append " - + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), - appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, - context.getReplicatedLog().getSnapshotIndex()); + + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(), + appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot, + appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } else if (appendEntries.getPrevLogIndex() != -1) { @@ -357,10 +375,10 @@ public class Follower extends AbstractRaftActorBehavior { // The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, " - + "snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex, - context.getReplicatedLog().getSnapshotIndex()); + + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex, + context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } @@ -372,18 +390,22 @@ public class Follower extends AbstractRaftActorBehavior { // the previous entry in it's in-memory journal log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the " - + "in-memory journal", logName(), appendEntries.getReplicatedToAllIndex()); + + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(), + appendEntries.getReplicatedToAllIndex(), lastIndex, + context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } final List entries = appendEntries.getEntries(); if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).getIndex() - 1)) { log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the " - + "in-memory journal", logName(), entries.get(0).getIndex() - 1); + + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(), + entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } @@ -391,15 +413,21 @@ public class Follower extends AbstractRaftActorBehavior { return false; } - private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) { + private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, final boolean forceInstallSnapshot, + final short leaderRaftVersion) { // We found that the log was out of sync so just send a negative reply. final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), - lastTerm(), context.getPayloadVersion(), forceInstallSnapshot); + lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(), + leaderRaftVersion); log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply); sender.tell(reply, actor()); } + private boolean needsLeaderAddress() { + return context.getPeerAddress(leaderId) == null; + } + @Override protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) { @@ -412,6 +440,11 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + return new ApplyState(null, null, entry); + } + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { @@ -422,30 +455,29 @@ public class Follower extends AbstractRaftActorBehavior { return this; } - if (!(message instanceof RaftRPC)) { + if (!(message instanceof RaftRPC rpc)) { // The rest of the processing requires the message to be a RaftRPC return null; } - final RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) { log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); } - if (rpc instanceof InstallSnapshot) { - handleInstallSnapshot(sender, (InstallSnapshot) rpc); + if (rpc instanceof InstallSnapshot installSnapshot) { + handleInstallSnapshot(sender, installSnapshot); restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); return this; } - if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) { + if (!(rpc instanceof RequestVote requestVote) || canGrantVote(requestVote)) { restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); } @@ -477,6 +509,10 @@ public class Follower extends AbstractRaftActorBehavior { if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) { log.debug("{}: Received ElectionTimeout but leader appears to be available", logName()); scheduleElection(electionDuration()); + } else if (isThisFollowerIsolated()) { + log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName()); + setLeaderId(null); + scheduleElection(electionDuration()); } else { log.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); @@ -546,6 +582,36 @@ public class Follower extends AbstractRaftActorBehavior { return false; } + private boolean isThisFollowerIsolated() { + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return false; + } + + final Cluster cluster = maybeCluster.get(); + final Member selfMember = cluster.selfMember(); + + final CurrentClusterState state = cluster.state(); + final Set unreachable = state.getUnreachable(); + final Iterable members = state.getMembers(); + + log.debug("{}: Checking if this node is isolated in the cluster unreachable set {}," + + "all members {} self member: {}", logName(), unreachable, members, selfMember); + + // no unreachable peers means we cannot be isolated + if (unreachable.isEmpty()) { + return false; + } + + final Set membersToCheck = new HashSet<>(); + members.forEach(membersToCheck::add); + + membersToCheck.removeAll(unreachable); + + // check if the only member not unreachable is us + return membersToCheck.size() == 1 && membersToCheck.iterator().next().equals(selfMember); + } + private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) { log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); @@ -570,14 +636,14 @@ public class Follower extends AbstractRaftActorBehavior { Snapshot snapshot = Snapshot.create( context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes()), - new ArrayList<>(), + List.of(), installSnapshot.getLastIncludedIndex(), installSnapshot.getLastIncludedTerm(), installSnapshot.getLastIncludedIndex(), installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - installSnapshot.getServerConfig().orNull()); + installSnapshot.getServerConfig().orElse(null)); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override @@ -604,8 +670,7 @@ public class Follower extends AbstractRaftActorBehavior { } catch (IOException e) { log.debug("{}: Exception in InstallSnapshot of follower", logName(), e); - sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), - -1, false), actor()); + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor()); closeSnapshotTracker(); }