X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=fd2fbd332c7a58bab6f60b01e37b2193ad98c3e7;hp=087c656b1836daaf9c05ac7b91e32eba1057c02c;hb=HEAD;hpb=1ffd1f44c4beacdb28683c028bc0eaa209731098 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 087c656b18..055a053500 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -5,16 +5,19 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Cancellable; -import com.google.common.base.Preconditions; +import akka.cluster.Cluster; +import akka.cluster.Member; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.Random; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -22,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; @@ -64,28 +68,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { private long replicatedToAllIndex = -1; AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { - this.context = Preconditions.checkNotNull(context); - this.state = Preconditions.checkNotNull(state); - this.log = context.getLogger(); + this.context = requireNonNull(context); + this.state = requireNonNull(state); + log = context.getLogger(); logName = String.format("%s (%s)", context.getId(), state); } public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) { - switch (state) { - case Candidate: - return new Candidate(context); - case Follower: - return new Follower(context); - case IsolatedLeader: - return new IsolatedLeader(context); - case Leader: - return new Leader(context); - case PreLeader: - return new PreLeader(context); - default: - throw new IllegalArgumentException("Unhandled state " + state); - } + return switch (state) { + case Candidate -> new Candidate(context); + case Follower -> new Follower(context); + case IsolatedLeader -> new IsolatedLeader(context); + case Leader -> new Leader(context); + case PreLeader -> new PreLeader(context); + }; } @Override @@ -208,10 +205,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // the log with the later term is more up-to-date. If the logs // end with the same term, then whichever log is longer is // more up-to-date. - if (requestVote.getLastLogTerm() > lastTerm()) { - candidateLatest = true; - } else if (requestVote.getLastLogTerm() == lastTerm() - && requestVote.getLastLogIndex() >= lastIndex()) { + if (requestVote.getLastLogTerm() > lastTerm() + || requestVote.getLastLogTerm() == lastTerm() && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } @@ -243,7 +238,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return a random election duration */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + long variance = ThreadLocalRandom.current().nextInt(context.getConfigParams().getElectionTimeVariance()); return context.getConfigParams().getElectionTimeOutInterval().$plus( new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } @@ -266,6 +261,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param interval the duration after which we should trigger a new election */ + // Non-final for testing protected void scheduleElection(final FiniteDuration interval) { stopElection(); @@ -297,7 +293,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @return the actor */ - protected ActorRef actor() { + protected final ActorRef actor() { return context.getActor(); } @@ -319,15 +315,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { - return null; - } - /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * @@ -392,13 +379,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState; - final ClientRequestTracker tracker = removeClientRequestTracker(i); - if (tracker != null) { - applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); - } else { - applyState = new ApplyState(null, null, replicatedLogEntry); - } + final ApplyState applyState = getApplyStateFor(replicatedLogEntry); log.debug("{}: Setting last applied to {}", logName(), i); @@ -420,16 +401,24 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } + /** + * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. + * + * @param entry the log entry + * @return ApplyState for this entry + */ + abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { - if (message instanceof AppendEntries) { - return appendEntries(sender, (AppendEntries) message); - } else if (message instanceof AppendEntriesReply) { - return handleAppendEntriesReply(sender, (AppendEntriesReply) message); - } else if (message instanceof RequestVote) { - return requestVote(sender, (RequestVote) message); - } else if (message instanceof RequestVoteReply) { - return handleRequestVoteReply(sender, (RequestVoteReply) message); + if (message instanceof AppendEntries appendEntries) { + return appendEntries(sender, appendEntries); + } else if (message instanceof AppendEntriesReply appendEntriesReply) { + return handleAppendEntriesReply(sender, appendEntriesReply); + } else if (message instanceof RequestVote requestVote) { + return requestVote(sender, requestVote); + } else if (message instanceof RequestVoteReply requestVoteReply) { + return handleRequestVoteReply(sender, requestVoteReply); } else { return null; } @@ -450,12 +439,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return this; } - log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(), + log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), state(), newBehavior.state(), context.getTermInformation().getCurrentTerm()); try { close(); } catch (RuntimeException e) { - log.error("{}: Failed to close behavior : {}", logName(), this.state(), e); + log.error("{}: Failed to close behavior : {}", logName(), state(), e); } return newBehavior; } @@ -499,7 +488,40 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } - protected String getId() { + protected final String getId() { return context.getId(); } + + // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote + // messages, as the candidate is not able to receive our response. + protected boolean shouldUpdateTerm(final RaftRPC rpc) { + if (!(rpc instanceof RequestVote requestVote)) { + return true; + } + + log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName()); + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return true; + } + + final Cluster cluster = maybeCluster.orElseThrow(); + + final Set unreachable = cluster.state().getUnreachable(); + log.debug("{}: Cluster state: {}", logName(), unreachable); + + for (Member member : unreachable) { + for (String role : member.getRoles()) { + if (requestVote.getCandidateId().startsWith(role)) { + log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(), + member, requestVote); + return false; + } + } + } + + log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(), + requestVote); + return true; + } }