X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=eed74bba82fbf8a4f8e4c1b5431887451c49d59d;hb=33a40451c53d0cbf66407bd3e217baf904aa7088;hp=ae1baec471ef1d08abdcbb6d9c56052b95a87ffa;hpb=7be62e955c32ff7fa10753c4307199b287b1904c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index ae1baec471..eed74bba82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,12 +10,16 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.event.LoggingAdapter; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -41,24 +45,24 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected final RaftActorContext context; /** - * The maximum election time variance + * */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; + protected final LoggingAdapter LOG; /** - * The interval in which a new election would get triggered if no leader is found + * */ - private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2; + private Cancellable electionCancel = null; /** * */ - - private Cancellable electionCancel = null; + protected String leaderId = null; protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; + this.LOG = context.getLogger(); } /** @@ -71,13 +75,40 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the AppendEntries - * message * @return */ - protected abstract RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState); + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, + AppendEntries appendEntries); + + + /** + * appendEntries first processes the AppendEntries message and then + * delegates handling to a specific behavior + * + * @param sender + * @param appendEntries + * @return + */ + protected RaftActorBehavior appendEntries(ActorRef sender, + AppendEntries appendEntries) { + + // 1. Reply false if term < currentTerm (§5.1) + if (appendEntries.getTerm() < currentTerm()) { + if(LOG.isDebugEnabled()) { + LOG.debug("Cannot append entries because sender term {} is less than {}", + appendEntries.getTerm(), currentTerm()); + } + + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); + return this; + } + + + return handleAppendEntries(sender, appendEntries); + } /** * Derived classes should not directly handle AppendEntriesReply messages it @@ -89,27 +120,36 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the - * AppendEntriesReply message * @return */ + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply); - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState); + /** + * requestVote handles the RequestVote message. This logic is common + * for all behaviors + * + * @param sender + * @param requestVote + * @return + */ + protected RaftActorBehavior requestVote(ActorRef sender, + RequestVote requestVote) { - protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState){ + if(LOG.isDebugEnabled()) { + LOG.debug(requestVote.toString()); + } boolean grantVote = false; // Reply false if term < currentTerm (§5.1) - if(requestVote.getTerm() < currentTerm()){ + if (requestVote.getTerm() < currentTerm()) { grantVote = false; - // If votedFor is null or candidateId, and candidate’s log is at - // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) - } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) { + // If votedFor is null or candidateId, and candidate’s log is at + // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) + } else if (votedFor() == null || votedFor() + .equals(requestVote.getCandidateId())) { boolean candidateLatest = false; @@ -120,22 +160,23 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // the log with the later term is more up-to-date. If the logs // end with the same term, then whichever log is longer is // more up-to-date. - if(requestVote.getLastLogTerm() > lastTerm()){ + if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; - } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){ + } else if ((requestVote.getLastLogTerm() == lastTerm()) + && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } - if(candidateLatest) { + if (candidateLatest) { grantVote = true; - context.getTermInformation().update(requestVote.getTerm(), + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } } sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - return suggestedState; + return this; } /** @@ -148,31 +189,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message * @return */ - - protected abstract RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState); + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply); /** - * @return The derived class should return the state that corresponds to - * it's behavior + * Creates a random election duration + * + * @return */ - protected abstract RaftState state(); - - protected FiniteDuration electionDuration(){ - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); + protected FiniteDuration electionDuration() { + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } - protected void scheduleElection(FiniteDuration interval) { - + /** + * stop the scheduled election + */ + protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { electionCancel.cancel(); } + } + + /** + * schedule a new election + * + * @param interval + */ + protected void scheduleElection(FiniteDuration interval) { + stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout // message is sent to itself @@ -182,70 +230,173 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getActorSystem().dispatcher(), context.getActor()); } - protected long currentTerm(){ + /** + * Get the current term + * @return + */ + protected long currentTerm() { return context.getTermInformation().getCurrentTerm(); } - protected String votedFor(){ + /** + * Get the candidate for whom we voted in the current term + * @return + */ + protected String votedFor() { return context.getTermInformation().getVotedFor(); } - protected ActorRef actor(){ + /** + * Get the actor associated with this behavior + * @return + */ + protected ActorRef actor() { return context.getActor(); } + /** + * Get the term from the last entry in the log + * + * @return + */ protected long lastTerm() { - return context.getReplicatedLog().last().getTerm(); + return context.getReplicatedLog().lastTerm(); } + /** + * Get the index from the last entry in the log + * + * @return + */ protected long lastIndex() { - return context.getReplicatedLog().last().getIndex(); + return context.getReplicatedLog().lastIndex(); } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + return null; + } - @Override - public RaftState handleMessage(ActorRef sender, Object message) { - RaftState raftState = state(); - if (message instanceof RaftRPC) { - raftState = applyTerm((RaftRPC) message); + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + + /** + * Find the log index from the previous to last entry in the log + * + * @return + */ + protected long prevLogIndex(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getIndex(); } - if (message instanceof AppendEntries) { - AppendEntries appendEntries = (AppendEntries) message; - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { - applyLogToStateMachine(appendEntries.getLeaderCommit()); + return -1; + } + + /** + * Find the log term from the previous to last entry in the log + * @return + */ + protected long prevLogTerm(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getTerm(); + } + return -1; + } + + /** + * Apply the provided index to the state machine + * + * @param index a log index that is known to be committed + */ + protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); + // Now maybe we apply to the state machine + for (long i = context.getLastApplied() + 1; + i < index + 1; i++) { + ActorRef clientActor = null; + String identifier = null; + ClientRequestTracker tracker = removeClientRequestTracker(i); + + if (tracker != null) { + clientActor = tracker.getClientActor(); + identifier = tracker.getIdentifier(); } - raftState = handleAppendEntries(sender, appendEntries, raftState); - } else if (message instanceof AppendEntriesReply) { - raftState = - handleAppendEntriesReply(sender, (AppendEntriesReply) message, - raftState); - } else if (message instanceof RequestVote) { - raftState = - handleRequestVote(sender, (RequestVote) message, raftState); - } else if (message instanceof RequestVoteReply) { - raftState = - handleRequestVoteReply(sender, (RequestVoteReply) message, - raftState); + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(i); + + if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + actor().tell(new ApplyState(clientActor, identifier, + replicatedLogEntry), actor()); + newLastApplied = i; + } else { + //if one index is not present in the log, no point in looping + // around as the rest wont be present either + LOG.warning( + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index); + break; + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("Setting last applied to {}", newLastApplied); } - return raftState; + context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); } - private RaftState applyTerm(RaftRPC rpc) { - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); - return RaftState.Follower; + protected Object fromSerializableMessage(Object serializable){ + return SerializationUtils.fromSerializable(serializable); + } + + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + if (message instanceof AppendEntries) { + return appendEntries(sender, (AppendEntries) message); + } else if (message instanceof AppendEntriesReply) { + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); + } else if (message instanceof RequestVote) { + return requestVote(sender, (RequestVote) message); + } else if (message instanceof RequestVoteReply) { + return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return state(); + return this; } - private void applyLogToStateMachine(long index) { - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); + @Override public String getLeaderId() { + return leaderId; } + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + LOG.info("Switching from behavior {} to {}", this.state(), behavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error(e, "Failed to close behavior : {}", this.state()); + } + return behavior; + } }