X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=e714fddb749ad902d9fada289d4cc3425fa6cb04;hp=0b428fee49b14f700f080d046f480ca5ba5ca414;hb=73ab61a037dd2489600acbc1eaf6f9ee549c204a;hpb=08221ab20d1632f7c1995d5e1038411a89bc4d4a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 0b428fee49..e714fddb74 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -5,15 +5,19 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Cancellable; -import com.google.common.base.Preconditions; -import java.util.Random; +import akka.cluster.Cluster; +import akka.cluster.Member; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -21,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; @@ -39,6 +44,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** * Used for message logging. */ + @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE") protected final Logger log; /** @@ -62,9 +68,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { private long replicatedToAllIndex = -1; AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { - this.context = Preconditions.checkNotNull(context); - this.state = Preconditions.checkNotNull(state); - this.log = context.getLogger(); + this.context = requireNonNull(context); + this.state = requireNonNull(state); + log = context.getLogger(); logName = String.format("%s (%s)", context.getId(), state); } @@ -96,7 +102,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } @Override - public void setReplicatedToAllIndex(long replicatedToAllIndex) { + public void setReplicatedToAllIndex(final long replicatedToAllIndex) { this.replicatedToAllIndex = replicatedToAllIndex; } @@ -127,7 +133,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries the message * @return a new behavior if it was changed or the current behavior */ - protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { + protected RaftActorBehavior appendEntries(final ActorRef sender, final AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { @@ -135,7 +141,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { appendEntries.getTerm(), currentTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), - context.getPayloadVersion()), actor()); + context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor()); return this; } @@ -165,9 +171,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVote the message * @return a new behavior if it was changed or the current behavior */ - protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { + protected RaftActorBehavior requestVote(final ActorRef sender, final RequestVote requestVote) { - log.debug("{}: In requestVote: {}", logName(), requestVote); + log.debug("{}: In requestVote: {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName(), + requestVote, currentTerm(), votedFor(), lastIndex(), lastTerm()); boolean grantVote = canGrantVote(requestVote); @@ -184,7 +191,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return this; } - protected boolean canGrantVote(RequestVote requestVote) { + protected boolean canGrantVote(final RequestVote requestVote) { boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -205,10 +212,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // the log with the later term is more up-to-date. If the logs // end with the same term, then whichever log is longer is // more up-to-date. - if (requestVote.getLastLogTerm() > lastTerm()) { - candidateLatest = true; - } else if (requestVote.getLastLogTerm() == lastTerm() - && requestVote.getLastLogIndex() >= lastIndex()) { + if (requestVote.getLastLogTerm() > lastTerm() + || requestVote.getLastLogTerm() == lastTerm() && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } @@ -240,7 +245,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return a random election duration */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + long variance = ThreadLocalRandom.current().nextInt(context.getConfigParams().getElectionTimeVariance()); return context.getConfigParams().getElectionTimeOutInterval().$plus( new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } @@ -263,7 +268,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param interval the duration after which we should trigger a new election */ - protected void scheduleElection(FiniteDuration interval) { + // Non-final for testing + protected void scheduleElection(final FiniteDuration interval) { stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself @@ -294,7 +300,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @return the actor */ - protected ActorRef actor() { + protected final ActorRef actor() { return context.getActor(); } @@ -316,21 +322,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - return null; - } - /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * * @return the log entry index or -1 if not found */ - protected long getLogEntryIndex(long index) { + protected long getLogEntryIndex(final long index) { if (index == context.getReplicatedLog().getSnapshotIndex()) { return context.getReplicatedLog().getSnapshotIndex(); } @@ -344,11 +341,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * Returns the actual term of the entry in replicated log for the given index or -1 if not found. + * Returns the actual term of the entry in the replicated log for the given index or -1 if not found. * * @return the log entry term or -1 if not found */ - protected long getLogEntryTerm(long index) { + protected long getLogEntryTerm(final long index) { if (index == context.getReplicatedLog().getSnapshotIndex()) { return context.getReplicatedLog().getSnapshotTerm(); } @@ -361,6 +358,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return -1; } + /** + * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the + * snapshot term if the given index is in the snapshot or -1 otherwise. + * + * @return the term or -1 otherwise + */ + protected long getLogEntryOrSnapshotTerm(final long index) { + if (context.getReplicatedLog().isInSnapshot(index)) { + return context.getReplicatedLog().getSnapshotTerm(); + } + + return getLogEntryTerm(index); + } + /** * Applies the log entries up to the specified index that is known to be committed to the state machine. * @@ -375,13 +386,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState; - final ClientRequestTracker tracker = removeClientRequestTracker(i); - if (tracker != null) { - applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); - } else { - applyState = new ApplyState(null, null, replicatedLogEntry); - } + final ApplyState applyState = getApplyStateFor(replicatedLogEntry); log.debug("{}: Setting last applied to {}", logName(), i); @@ -403,8 +408,16 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } + /** + * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. + * + * @param entry the log entry + * @return ApplyState for this entry + */ + abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); + @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof AppendEntries) { return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { @@ -419,16 +432,16 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } @Override - public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + public RaftActorBehavior switchBehavior(final RaftActorBehavior behavior) { return internalSwitchBehavior(behavior); } - protected RaftActorBehavior internalSwitchBehavior(RaftState newState) { + protected RaftActorBehavior internalSwitchBehavior(final RaftState newState) { return internalSwitchBehavior(createBehavior(context, newState)); } @SuppressWarnings("checkstyle:IllegalCatch") - protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + protected RaftActorBehavior internalSwitchBehavior(final RaftActorBehavior newBehavior) { if (!context.getRaftPolicy().automaticElectionsEnabled()) { return this; } @@ -444,7 +457,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - protected int getMajorityVoteCount(int numPeers) { + protected int getMajorityVoteCount(final int numPeers) { // Votes are required from a majority of the peers including self. // The numMajority field therefore stores a calculated value // of the number of votes required for this candidate to win an @@ -482,7 +495,41 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } - protected String getId() { + protected final String getId() { return context.getId(); } + + // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote + // messages, as the candidate is not able to receive our response. + protected boolean shouldUpdateTerm(final RaftRPC rpc) { + if (!(rpc instanceof RequestVote)) { + return true; + } + + final RequestVote requestVote = (RequestVote) rpc; + log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName()); + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return true; + } + + final Cluster cluster = maybeCluster.get(); + + final Set unreachable = cluster.state().getUnreachable(); + log.debug("{}: Cluster state: {}", logName(), unreachable); + + for (Member member : unreachable) { + for (String role : member.getRoles()) { + if (requestVote.getCandidateId().startsWith(role)) { + log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(), + member, requestVote); + return false; + } + } + } + + log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(), + requestVote); + return true; + } }