X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=0a553b40fd59aab555f258f897a2154830afd1c8;hb=5a9287bb6ddaaa8805939b3b3301f648c03785f4;hp=304b2fdbab0b8c242ae01e4768547cd8d5861a46;hpb=fdab53ef9033fc83c812f7d3d6d3327d3d176f0f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 304b2fdbab..0a553b40fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -14,11 +14,11 @@ import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -43,27 +43,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected final RaftActorContext context; - /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The interval in which a new election would get triggered if no leader is found - */ - private static final long ELECTION_TIME_INTERVAL = - HEART_BEAT_INTERVAL.toMillis() * 2; - /** * */ @@ -89,22 +68,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the AppendEntries - * message * @return */ protected abstract RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState); + AppendEntries appendEntries); + /** + * appendEntries first processes the AppendEntries message and then + * delegates handling to a specific behavior + * + * @param sender + * @param appendEntries + * @return + */ protected RaftState appendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState raftState) { - - if (raftState != state()) { - context.getLogger().debug("Suggested state is " + raftState - + " current behavior state is " + state()); - } + AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { @@ -119,7 +98,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - return handleAppendEntries(sender, appendEntries, raftState); + return handleAppendEntries(sender, appendEntries); } /** @@ -132,17 +111,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the - * AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState); + AppendEntriesReply appendEntriesReply); + /** + * requestVote handles the RequestVote message. This logic is common + * for all behaviors + * + * @param sender + * @param requestVote + * @return + */ protected RaftState requestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { + RequestVote requestVote) { boolean grantVote = false; @@ -173,14 +156,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (candidateLatest) { grantVote = true; - context.getTermInformation().update(requestVote.getTerm(), + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } } sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - return suggestedState; + return state(); } /** @@ -193,29 +176,37 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState); + RequestVoteReply requestVoteReply); + /** + * Creates a random election duration + * + * @return + */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, - TimeUnit.MILLISECONDS); + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } + /** + * stop the scheduled election + */ protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { electionCancel.cancel(); } } + /** + * schedule a new election + * + * @param interval + */ protected void scheduleElection(FiniteDuration interval) { - stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout @@ -226,30 +217,90 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getActorSystem().dispatcher(), context.getActor()); } + /** + * Get the current term + * @return + */ protected long currentTerm() { return context.getTermInformation().getCurrentTerm(); } + /** + * Get the candidate for whom we voted in the current term + * @return + */ protected String votedFor() { return context.getTermInformation().getVotedFor(); } + /** + * Get the actor associated with this behavior + * @return + */ protected ActorRef actor() { return context.getActor(); } + /** + * Get the term from the last entry in the log + * + * @return + */ protected long lastTerm() { return context.getReplicatedLog().lastTerm(); } + /** + * Get the index from the last entry in the log + * + * @return + */ protected long lastIndex() { return context.getReplicatedLog().lastIndex(); } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ protected ClientRequestTracker findClientRequestTracker(long logIndex) { return null; } + /** + * Find the log index from the previous to last entry in the log + * + * @return + */ + protected long prevLogIndex(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getIndex(); + } + return -1; + } + + /** + * Find the log term from the previous to last entry in the log + * @return + */ + protected long prevLogTerm(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getTerm(); + } + return -1; + } + + /** + * Apply the provided index to the state machine + * + * @param index a log index that is known to be committed + */ protected void applyLogToStateMachine(long index) { // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; @@ -278,43 +329,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.setLastApplied(index); } + protected Object fromSerializableMessage(Object serializable){ + return SerializationUtils.fromSerializable(serializable); + } + @Override public RaftState handleMessage(ActorRef sender, Object message) { - RaftState raftState = state(); - if (message instanceof RaftRPC) { - raftState = applyTerm((RaftRPC) message); - } if (message instanceof AppendEntries) { - raftState = appendEntries(sender, (AppendEntries) message, - raftState); + return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { - raftState = - handleAppendEntriesReply(sender, (AppendEntriesReply) message, - raftState); + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); } else if (message instanceof RequestVote) { - raftState = - requestVote(sender, (RequestVote) message, raftState); + return requestVote(sender, (RequestVote) message); } else if (message instanceof RequestVoteReply) { - raftState = - handleRequestVoteReply(sender, (RequestVoteReply) message, - raftState); + return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return raftState; + return state(); } @Override public String getLeaderId() { return leaderId; } - - private RaftState applyTerm(RaftRPC rpc) { - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); - return RaftState.Follower; - } - return state(); - } - }