X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=375b3779b891dd94e1a3b702f9396853478de265;hb=a0b8be5ce48c0d1e0b573d1952211913c58d4935;hp=d7a8d5abb35bdb258633b65a783aed9e0aa9677d;hpb=583f6075e842a6a37b83bd01e478aebc70c6af73;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d7a8d5abb3..375b3779b8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -9,13 +9,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.Cancellable; +import com.google.common.base.Preconditions; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.slf4j.Logger; +import scala.concurrent.duration.FiniteDuration; /** * Abstract class that represents the behavior of a RaftActor @@ -28,15 +38,69 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; * set currentTerm = T, convert to follower (§5.1) */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { - /** * Information about the RaftActor whose behavior this class represents */ protected final RaftActorContext context; + /** + * + */ + protected final Logger LOG; + + /** + * + */ + private Cancellable electionCancel = null; + + private long replicatedToAllIndex = -1; + + private final String logName; + + private final RaftState state; + + AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { + this.context = Preconditions.checkNotNull(context); + this.state = Preconditions.checkNotNull(state); + this.LOG = context.getLogger(); + + logName = String.format("%s (%s)", context.getId(), state); + } + + public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) { + switch (state) { + case Candidate: + return new Candidate(context); + case Follower: + return new Follower(context); + case IsolatedLeader: + return new IsolatedLeader(context); + case Leader: + return new Leader(context); + case PreLeader: + return new PreLeader(context); + default: + throw new IllegalArgumentException("Unhandled state " + state); + } + } + + @Override + public final RaftState state() { + return state; + } - protected AbstractRaftActorBehavior(RaftActorContext context) { - this.context = context; + protected final String logName() { + return logName; + } + + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } /** @@ -49,13 +113,40 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the AppendEntries - * message - * @return + * @return a new behavior if it was changed or the current behavior */ - protected abstract RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState); + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, + AppendEntries appendEntries); + + + /** + * appendEntries first processes the AppendEntries message and then + * delegates handling to a specific behavior + * + * @param sender + * @param appendEntries + * @return a new behavior if it was changed or the current behavior + */ + protected RaftActorBehavior appendEntries(ActorRef sender, + AppendEntries appendEntries) { + + // 1. Reply false if term < currentTerm (§5.1) + if (appendEntries.getTerm() < currentTerm()) { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Cannot append entries because sender term {} is less than {}", + logName(), appendEntries.getTerm(), currentTerm()); + } + + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm(), context.getPayloadVersion()), actor() + ); + return this; + } + + + return handleAppendEntries(sender, appendEntries); + } /** * Derived classes should not directly handle AppendEntriesReply messages it @@ -67,32 +158,72 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the - * AppendEntriesReply message - * @return + * @return a new behavior if it was changed or the current behavior */ - - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState); + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply); /** - * Derived classes should not directly handle RequestVote messages it - * should let the base class handle it first. Once the base class handles - * the RequestVote message and does the common actions that are applicable - * in all RaftState's it will delegate the handling of the RequestVote - * message to the derived class to do more state specific handling by calling - * this method + * requestVote handles the RequestVote message. This logic is common + * for all behaviors * - * @param sender The actor that sent this message - * @param requestVote The RequestVote message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message - * @return + * @param sender + * @param requestVote + * @return a new behavior if it was changed or the current behavior */ - protected abstract RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState); + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { + + LOG.debug("{}: In requestVote: {}", logName(), requestVote); + + boolean grantVote = canGrantVote(requestVote); + + if(grantVote) { + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); + } + + RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); + + LOG.debug("{}: requestVote returning: {}", logName(), reply); + + sender.tell(reply, actor()); + + return this; + } + + protected boolean canGrantVote(RequestVote requestVote){ + boolean grantVote = false; + + // Reply false if term < currentTerm (§5.1) + if (requestVote.getTerm() < currentTerm()) { + grantVote = false; + + // If votedFor is null or candidateId, and candidate’s log is at + // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) + } else if (votedFor() == null || votedFor() + .equals(requestVote.getCandidateId())) { + + boolean candidateLatest = false; + + // From §5.4.1 + // Raft determines which of two logs is more up-to-date + // by comparing the index and term of the last entries in the + // logs. If the logs have last entries with different terms, then + // the log with the later term is more up-to-date. If the logs + // end with the same term, then whichever log is longer is + // more up-to-date. + if (requestVote.getLastLogTerm() > lastTerm()) { + candidateLatest = true; + } else if ((requestVote.getLastLogTerm() == lastTerm()) + && requestVote.getLastLogIndex() >= lastIndex()) { + candidateLatest = true; + } + + if (candidateLatest) { + grantVote = true; + } + } + return grantVote; + } /** * Derived classes should not directly handle RequestVoteReply messages it @@ -104,59 +235,247 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message - * @return + * @return a new behavior if it was changed or the current behavior */ + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply); - protected abstract RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState); + /** + * + * @return a random election duration + */ + protected FiniteDuration electionDuration() { + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); + } /** - * @return The derived class should return the state that corresponds to - * it's behavior + * stop the scheduled election */ - protected abstract RaftState state(); + protected void stopElection() { + if (electionCancel != null && !electionCancel.isCancelled()) { + electionCancel.cancel(); + } + } - @Override - public RaftState handleMessage(ActorRef sender, Object message) { - RaftState raftState = state(); - if (message instanceof RaftRPC) { - raftState = applyTerm((RaftRPC) message); + protected boolean canStartElection() { + return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember(); + } + + /** + * schedule a new election + * + * @param interval the duration after which we should trigger a new election + */ + protected void scheduleElection(FiniteDuration interval) { + stopElection(); + + // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself + electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(), + ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); + } + + /** + * @return the current term + */ + protected long currentTerm() { + return context.getTermInformation().getCurrentTerm(); + } + + /** + * @return the candidate for whom we voted in the current term + */ + protected String votedFor() { + return context.getTermInformation().getVotedFor(); + } + + /** + * @return the actor associated with this behavior + */ + protected ActorRef actor() { + return context.getActor(); + } + + /** + * + * @return the term from the last entry in the log + */ + protected long lastTerm() { + return context.getReplicatedLog().lastTerm(); + } + + /** + * @return the index from the last entry in the log + */ + protected long lastIndex() { + return context.getReplicatedLog().lastIndex(); + } + + /** + * @param logIndex + * @return the client request tracker for the specified logIndex + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + /** + * + * @return the log entry index for the given index or -1 if not found + */ + protected long getLogEntryIndex(long index){ + if(index == context.getReplicatedLog().getSnapshotIndex()){ + return context.getReplicatedLog().getSnapshotIndex(); } - if (message instanceof AppendEntries) { - AppendEntries appendEntries = (AppendEntries) message; - if (appendEntries.getLeaderCommit() > context.getLastApplied() - .get()) { - applyLogToStateMachine(appendEntries.getLeaderCommit()); + + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + if(entry != null){ + return entry.getIndex(); + } + + return -1; + } + + /** + * @return the log entry term for the given index or -1 if not found + */ + protected long getLogEntryTerm(long index){ + if(index == context.getReplicatedLog().getSnapshotIndex()){ + return context.getReplicatedLog().getSnapshotTerm(); + } + + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + if(entry != null){ + return entry.getTerm(); + } + + return -1; + } + + /** + * Apply the provided index to the state machine + * + * @param index a log index that is known to be committed + */ + protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); + // Now maybe we apply to the state machine + for (long i = context.getLastApplied() + 1; i < index + 1; i++) { + + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i); + if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + + final ApplyState msg; + final ClientRequestTracker tracker = removeClientRequestTracker(i); + if (tracker != null) { + msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); + } else { + msg = new ApplyState(null, null, replicatedLogEntry); + } + + actor().tell(msg, actor()); + newLastApplied = i; + } else { + //if one index is not present in the log, no point in looping + // around as the rest wont be present either + LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + logName(), i, i, index); + break; } - raftState = handleAppendEntries(sender, appendEntries, raftState); + } + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); + } + context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); + } + + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + if (message instanceof AppendEntries) { + return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { - raftState = - handleAppendEntriesReply(sender, (AppendEntriesReply) message, - raftState); + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); } else if (message instanceof RequestVote) { - raftState = - handleRequestVote(sender, (RequestVote) message, raftState); + return requestVote(sender, (RequestVote) message); } else if (message instanceof RequestVoteReply) { - raftState = - handleRequestVoteReply(sender, (RequestVoteReply) message, - raftState); + return handleRequestVoteReply(sender, (RequestVoteReply) message); + } else { + return null; + } + } + + @Override + public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + return internalSwitchBehavior(behavior); + } + + protected RaftActorBehavior internalSwitchBehavior(RaftState newState) { + return internalSwitchBehavior(createBehavior(context, newState)); + } + + protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + if(!context.getRaftPolicy().automaticElectionsEnabled()) { + return this; + } + + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); } - return raftState; + return newBehavior; } - private RaftState applyTerm(RaftRPC rpc) { - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() - .get()) { - context.getTermInformation().update(rpc.getTerm(), null); - return RaftState.Follower; + + protected int getMajorityVoteCount(int numPeers) { + // Votes are required from a majority of the peers including self. + // The numMajority field therefore stores a calculated value + // of the number of votes required for this candidate to win an + // election based on it's known peers. + // If a peer was added during normal operation and raft replicas + // came to know about them then the new peer would also need to be + // taken into consideration when calculating this value. + // Here are some examples for what the numMajority would be for n + // peers + // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1 + // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2 + // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3 + + int numMajority = 0; + if (numPeers > 0) { + int self = 1; + numMajority = (numPeers + self) / 2 + 1; + } + return numMajority; + + } + + + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex); + + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } - return state(); } - private void applyLogToStateMachine(long index) { - context.getLastApplied().set(index); + protected String getId(){ + return context.getId(); } }