X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=fd2fbd332c7a58bab6f60b01e37b2193ad98c3e7;hb=refs%2Fchanges%2F16%2F87616%2F4;hp=400f110865f53a37e64d223f7cbe894c912262f6;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 400f110865..fd2fbd332c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -5,16 +5,19 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Cancellable; -import com.google.common.base.Preconditions; +import akka.cluster.Cluster; +import akka.cluster.Member; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -22,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; @@ -64,8 +68,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { private long replicatedToAllIndex = -1; AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { - this.context = Preconditions.checkNotNull(context); - this.state = Preconditions.checkNotNull(state); + this.context = requireNonNull(context); + this.state = requireNonNull(state); this.log = context.getLogger(); logName = String.format("%s (%s)", context.getId(), state); @@ -137,7 +141,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { appendEntries.getTerm(), currentTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), - context.getPayloadVersion()), actor()); + context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor()); return this; } @@ -319,15 +323,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { - return null; - } - /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * @@ -347,7 +342,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * Returns the actual term of the entry in replicated log for the given index or -1 if not found. + * Returns the actual term of the entry in the replicated log for the given index or -1 if not found. * * @return the log entry term or -1 if not found */ @@ -364,6 +359,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return -1; } + /** + * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the + * snapshot term if the given index is in the snapshot or -1 otherwise. + * + * @return the term or -1 otherwise + */ + protected long getLogEntryOrSnapshotTerm(final long index) { + if (context.getReplicatedLog().isInSnapshot(index)) { + return context.getReplicatedLog().getSnapshotTerm(); + } + + return getLogEntryTerm(index); + } + /** * Applies the log entries up to the specified index that is known to be committed to the state machine. * @@ -378,13 +387,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState; - final ClientRequestTracker tracker = removeClientRequestTracker(i); - if (tracker != null) { - applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); - } else { - applyState = new ApplyState(null, null, replicatedLogEntry); - } + final ApplyState applyState = getApplyStateFor(replicatedLogEntry); log.debug("{}: Setting last applied to {}", logName(), i); @@ -406,6 +409,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } + /** + * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. + * + * @param entry the log entry + * @return ApplyState for this entry + */ + abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof AppendEntries) { @@ -488,4 +499,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected String getId() { return context.getId(); } + + // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote + // messages, as the candidate is not able to receive our response. + protected boolean shouldUpdateTerm(final RaftRPC rpc) { + if (!(rpc instanceof RequestVote)) { + return true; + } + + final RequestVote requestVote = (RequestVote) rpc; + log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName()); + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return true; + } + + final Cluster cluster = maybeCluster.get(); + + final Set unreachable = cluster.state().getUnreachable(); + log.debug("{}: Cluster state: {}", logName(), unreachable); + + for (Member member : unreachable) { + for (String role : member.getRoles()) { + if (requestVote.getCandidateId().startsWith(role)) { + log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(), + member, requestVote); + return false; + } + } + } + + log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(), + requestVote); + return true; + } }