X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=167082711d82b46c61e90e8311473d051e06461c;hp=ae1baec471ef1d08abdcbb6d9c56052b95a87ffa;hb=a0c5aba42aa36337ff1c6760175918b786897c9e;hpb=7be62e955c32ff7fa10753c4307199b287b1904c diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index ae1baec471..167082711d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.Cancellable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -45,10 +46,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + *

+ * Since this is set to 100 milliseconds the Election timeout should be + * at least 200 milliseconds + */ + protected static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(100, TimeUnit.MILLISECONDS); + /** * The interval in which a new election would get triggered if no leader is found */ - private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2; + private static final long ELECTION_TIME_INTERVAL = + HEART_BEAT_INTERVAL.toMillis() * 2; /** * @@ -79,6 +91,71 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState); + + protected RaftState appendEntries(ActorRef sender, + AppendEntries appendEntries, RaftState raftState){ + + // 1. Reply false if term < currentTerm (§5.1) + if(appendEntries.getTerm() < currentTerm()){ + sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); + return state(); + } + + // 2. Reply false if log doesn’t contain an entry at prevLogIndex + // whose term matches prevLogTerm (§5.3) + ReplicatedLogEntry previousEntry = context.getReplicatedLog() + .get(appendEntries.getPrevLogIndex()); + + if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){ + sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); + return state(); + } + + if(appendEntries.getEntries() != null) { + // 3. If an existing entry conflicts with a new one (same index + // but different terms), delete the existing entry and all that + // follow it (§5.3) + int addEntriesFrom = 0; + for (int i = 0; + i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { + ReplicatedLogEntry newEntry = context.getReplicatedLog() + .get(i + 1); + + if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){ + break; + } + if (newEntry != null && newEntry.getTerm() != appendEntries + .getEntries().get(i).getTerm()) { + context.getReplicatedLog().removeFrom(i + 1); + break; + } + } + + // 4. Append any new entries not already in the log + for (int i = addEntriesFrom; + i < appendEntries.getEntries().size(); i++) { + context.getReplicatedLog() + .append(appendEntries.getEntries().get(i)); + } + } + + + // 5. If leaderCommit > commitIndex, set commitIndex = + // min(leaderCommit, index of last new entry) + context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), + context.getReplicatedLog().last().getIndex())); + + // If commitIndex > lastApplied: increment lastApplied, apply + // log[lastApplied] to state machine (§5.3) + if (appendEntries.getLeaderCommit() > context.getLastApplied()) { + applyLogToStateMachine(appendEntries.getLeaderCommit()); + } + + sender.tell(new AppendEntriesReply(currentTerm(), true), actor()); + + return handleAppendEntries(sender, appendEntries, raftState); + } + /** * Derived classes should not directly handle AppendEntriesReply messages it * should let the base class handle it first. Once the base class handles @@ -98,18 +175,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState); - protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState){ + protected RaftState requestVote(ActorRef sender, + RequestVote requestVote, RaftState suggestedState) { boolean grantVote = false; // Reply false if term < currentTerm (§5.1) - if(requestVote.getTerm() < currentTerm()){ + if (requestVote.getTerm() < currentTerm()) { grantVote = false; - // If votedFor is null or candidateId, and candidate’s log is at - // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) - } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) { + // If votedFor is null or candidateId, and candidate’s log is at + // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) + } else if (votedFor() == null || votedFor() + .equals(requestVote.getCandidateId())) { boolean candidateLatest = false; @@ -120,13 +198,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // the log with the later term is more up-to-date. If the logs // end with the same term, then whichever log is longer is // more up-to-date. - if(requestVote.getLastLogTerm() > lastTerm()){ + if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; - } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){ + } else if ((requestVote.getLastLogTerm() == lastTerm()) + && requestVote.getLastLogIndex() >= lastTerm()) { candidateLatest = true; } - if(candidateLatest) { + if (candidateLatest) { grantVote = true; context.getTermInformation().update(requestVote.getTerm(), requestVote.getCandidateId()); @@ -163,9 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected abstract RaftState state(); - protected FiniteDuration electionDuration(){ + protected FiniteDuration electionDuration() { long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); + return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, + TimeUnit.MILLISECONDS); } protected void scheduleElection(FiniteDuration interval) { @@ -182,15 +262,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getActorSystem().dispatcher(), context.getActor()); } - protected long currentTerm(){ + protected long currentTerm() { return context.getTermInformation().getCurrentTerm(); } - protected String votedFor(){ + protected String votedFor() { return context.getTermInformation().getVotedFor(); } - protected ActorRef actor(){ + protected ActorRef actor() { return context.getActor(); } @@ -210,18 +290,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { raftState = applyTerm((RaftRPC) message); } if (message instanceof AppendEntries) { - AppendEntries appendEntries = (AppendEntries) message; - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { - applyLogToStateMachine(appendEntries.getLeaderCommit()); - } - raftState = handleAppendEntries(sender, appendEntries, raftState); + raftState = appendEntries(sender, (AppendEntries) message, + raftState); } else if (message instanceof AppendEntriesReply) { raftState = handleAppendEntriesReply(sender, (AppendEntriesReply) message, raftState); } else if (message instanceof RequestVote) { raftState = - handleRequestVote(sender, (RequestVote) message, raftState); + requestVote(sender, (RequestVote) message, raftState); } else if (message instanceof RequestVoteReply) { raftState = handleRequestVoteReply(sender, (RequestVoteReply) message,