From: Moiz Raja Date: Sat, 12 Jul 2014 12:39:52 +0000 (-0700) Subject: Properly handle RequestVote in all states X-Git-Tag: release/helium~439 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=7be62e955c32ff7fa10753c4307199b287b1904c Properly handle RequestVote in all states Change-Id: I31987824a2219be230e439ecb152e031c749d4ef Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java index 4b0367f0ad..664ab5e7b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.raft; -import java.util.concurrent.atomic.AtomicLong; - /** * ElectionTerm contains information about a RaftActors election term. *

@@ -23,7 +21,7 @@ public interface ElectionTerm { * latest term server has seen (initialized to 0 * on first boot, increases monotonically) */ - AtomicLong getCurrentTerm(); + long getCurrentTerm(); /** * candidateId that received vote in current diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java index e75e0c5bb6..6a598be680 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java @@ -8,15 +8,13 @@ package org.opendaylight.controller.cluster.raft; -import java.util.concurrent.atomic.AtomicLong; - public class ElectionTermImpl implements ElectionTerm{ /** * Identifier of the actor whose election term information this is */ private final String id; - private AtomicLong currentTerm; + private long currentTerm; private String votedFor; @@ -24,13 +22,13 @@ public class ElectionTermImpl implements ElectionTerm{ this.id = id; // TODO: Read currentTerm from some persistent state - currentTerm = new AtomicLong(0); + currentTerm = 0; // TODO: Read votedFor from some file votedFor = ""; } - public AtomicLong getCurrentTerm() { + public long getCurrentTerm() { return currentTerm; } @@ -39,7 +37,7 @@ public class ElectionTermImpl implements ElectionTerm{ } public void update(long currentTerm, String votedFor){ - this.currentTerm.set(currentTerm); + this.currentTerm = currentTerm; this.votedFor = votedFor; // TODO : Write to some persistent state diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 6e8e8208fb..5e628bdd1b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import java.util.Collections; -import java.util.concurrent.atomic.AtomicLong; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -77,7 +76,7 @@ public abstract class RaftActor extends UntypedEventsourcedProcessor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(id), - new AtomicLong(0), new AtomicLong(0), new ReplicatedLogImpl()); + 0, 0, new ReplicatedLogImpl()); currentBehavior = switchBehavior(RaftState.Follower); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 554461a76c..4bc9162fb5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -13,8 +13,6 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import java.util.concurrent.atomic.AtomicLong; - /** * The RaftActorContext contains that portion of the RaftActors state that * needs to be shared with it's behaviors. A RaftActorContext should NEVER be @@ -62,7 +60,13 @@ public interface RaftActorContext { * monotonically) * @return */ - AtomicLong getCommitIndex(); + long getCommitIndex(); + + + /** + * + */ + void setCommitIndex(long commitIndex); /** * index of highest log entry applied to state @@ -70,7 +74,13 @@ public interface RaftActorContext { * monotonically) * @return */ - AtomicLong getLastApplied(); + long getLastApplied(); + + + /** + * + */ + void setLastApplied(long lastApplied); /** * @return A representation of the log diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 1fdc3c628f..845011a7e3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -14,8 +14,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; -import java.util.concurrent.atomic.AtomicLong; - public class RaftActorContextImpl implements RaftActorContext{ private final ActorRef actor; @@ -26,16 +24,16 @@ public class RaftActorContextImpl implements RaftActorContext{ private final ElectionTerm termInformation; - private final AtomicLong commitIndex; + private long commitIndex; - private final AtomicLong lastApplied; + private long lastApplied; private final ReplicatedLog replicatedLog; public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, - ElectionTerm termInformation, AtomicLong commitIndex, - AtomicLong lastApplied, ReplicatedLog replicatedLog) { + ElectionTerm termInformation, long commitIndex, + long lastApplied, ReplicatedLog replicatedLog) { this.actor = actor; this.context = context; this.id = id; @@ -65,14 +63,22 @@ public class RaftActorContextImpl implements RaftActorContext{ return termInformation; } - public AtomicLong getCommitIndex() { + public long getCommitIndex() { return commitIndex; } - public AtomicLong getLastApplied() { + @Override public void setCommitIndex(long commitIndex) { + this.commitIndex = commitIndex; + } + + public long getLastApplied() { return lastApplied; } + @Override public void setLastApplied(long lastApplied) { + this.lastApplied = lastApplied; + } + @Override public ReplicatedLog getReplicatedLog() { return replicatedLog; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d7a8d5abb3..ae1baec471 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -9,13 +9,19 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.Cancellable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Abstract class that represents the behavior of a RaftActor @@ -34,6 +40,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected final RaftActorContext context; + /** + * The maximum election time variance + */ + private static final int ELECTION_TIME_MAX_VARIANCE = 100; + + /** + * The interval in which a new election would get triggered if no leader is found + */ + private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2; + + /** + * + */ + + private Cancellable electionCancel = null; + protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; @@ -76,23 +98,45 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState); - /** - * Derived classes should not directly handle RequestVote messages it - * should let the base class handle it first. Once the base class handles - * the RequestVote message and does the common actions that are applicable - * in all RaftState's it will delegate the handling of the RequestVote - * message to the derived class to do more state specific handling by calling - * this method - * - * @param sender The actor that sent this message - * @param requestVote The RequestVote message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message - * @return - */ - protected abstract RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState); + protected RaftState handleRequestVote(ActorRef sender, + RequestVote requestVote, RaftState suggestedState){ + + boolean grantVote = false; + + // Reply false if term < currentTerm (§5.1) + if(requestVote.getTerm() < currentTerm()){ + grantVote = false; + + // If votedFor is null or candidateId, and candidate’s log is at + // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) + } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) { + + boolean candidateLatest = false; + + // From §5.4.1 + // Raft determines which of two logs is more up-to-date + // by comparing the index and term of the last entries in the + // logs. If the logs have last entries with different terms, then + // the log with the later term is more up-to-date. If the logs + // end with the same term, then whichever log is longer is + // more up-to-date. + if(requestVote.getLastLogTerm() > lastTerm()){ + candidateLatest = true; + } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){ + candidateLatest = true; + } + + if(candidateLatest) { + grantVote = true; + context.getTermInformation().update(requestVote.getTerm(), + requestVote.getCandidateId()); + } + } + + sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); + + return suggestedState; + } /** * Derived classes should not directly handle RequestVoteReply messages it @@ -119,6 +163,46 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected abstract RaftState state(); + protected FiniteDuration electionDuration(){ + long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); + return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); + } + + protected void scheduleElection(FiniteDuration interval) { + + if (electionCancel != null && !electionCancel.isCancelled()) { + electionCancel.cancel(); + } + + // Schedule an election. When the scheduler triggers an ElectionTimeout + // message is sent to itself + electionCancel = + context.getActorSystem().scheduler().scheduleOnce(interval, + context.getActor(), new ElectionTimeout(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + protected long currentTerm(){ + return context.getTermInformation().getCurrentTerm(); + } + + protected String votedFor(){ + return context.getTermInformation().getVotedFor(); + } + + protected ActorRef actor(){ + return context.getActor(); + } + + protected long lastTerm() { + return context.getReplicatedLog().last().getTerm(); + } + + protected long lastIndex() { + return context.getReplicatedLog().last().getIndex(); + } + + @Override public RaftState handleMessage(ActorRef sender, Object message) { RaftState raftState = state(); @@ -127,8 +211,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } if (message instanceof AppendEntries) { AppendEntries appendEntries = (AppendEntries) message; - if (appendEntries.getLeaderCommit() > context.getLastApplied() - .get()) { + if (appendEntries.getLeaderCommit() > context.getLastApplied()) { applyLogToStateMachine(appendEntries.getLeaderCommit()); } raftState = handleAppendEntries(sender, appendEntries, raftState); @@ -148,8 +231,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } private RaftState applyTerm(RaftRPC rpc) { - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() - .get()) { + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().update(rpc.getTerm(), null); return RaftState.Follower; } @@ -157,6 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } private void applyLogToStateMachine(long index) { - context.getLastApplied().set(index); + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + context.setLastApplied(index); } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index f61905e393..3e6b502631 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.actor.Cancellable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; @@ -18,14 +17,10 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * The behavior of a RaftActor when it is in the CandidateState @@ -47,23 +42,8 @@ import java.util.concurrent.atomic.AtomicLong; */ public class Candidate extends AbstractRaftActorBehavior { - /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval in which a new election would get triggered if no leader is found - */ - private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2; - - /** - * - */ private final Map peerToActor = new HashMap<>(); - private Cancellable electionCancel = null; - private int voteCount; private final int votesRequired; @@ -96,7 +76,8 @@ public class Candidate extends AbstractRaftActorBehavior { votesRequired = 0; } - scheduleElection(randomizedDuration()); + startNewTerm(); + scheduleElection(electionDuration()); } @Override protected RaftState handleAppendEntries(ActorRef sender, @@ -105,7 +86,7 @@ public class Candidate extends AbstractRaftActorBehavior { // There is some peer who thinks it's a leader but is not // I will not accept this append entries sender.tell(new AppendEntriesReply( - context.getTermInformation().getCurrentTerm().get(), false), + context.getTermInformation().getCurrentTerm(), false), context.getActor()); return suggestedState; @@ -119,18 +100,6 @@ public class Candidate extends AbstractRaftActorBehavior { return suggestedState; } - @Override protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { - - // We got this RequestVote because the term in there is less than - // or equal to our current term, so do not grant the vote - sender.tell(new RequestVoteReply( - context.getTermInformation().getCurrentTerm().get(), false), - context.getActor()); - - return suggestedState; - } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { if(suggestedState == RaftState.Follower) { @@ -164,45 +133,32 @@ public class Candidate extends AbstractRaftActorBehavior { // to send a message to the candidate return RaftState.Leader; } - scheduleElection(randomizedDuration()); + startNewTerm(); + scheduleElection(electionDuration()); return state(); } return super.handleMessage(sender, message); } - private FiniteDuration randomizedDuration(){ - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); - } - - private void scheduleElection(FiniteDuration interval) { + private void startNewTerm(){ // set voteCount back to 1 (that is voting for self) voteCount = 1; // Increment the election term and vote for self - AtomicLong currentTerm = context.getTermInformation().getCurrentTerm(); - context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId()); + long currentTerm = context.getTermInformation().getCurrentTerm(); + context.getTermInformation().update(currentTerm+1, context.getId()); // Request for a vote for(ActorSelection peerActor : peerToActor.values()){ peerActor.tell(new RequestVote( - context.getTermInformation().getCurrentTerm().get(), - context.getId(), context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm()), + context.getTermInformation().getCurrentTerm(), + context.getId(), context.getReplicatedLog().last().getIndex(), + context.getReplicatedLog().last().getTerm()), context.getActor()); } - if (electionCancel != null && !electionCancel.isCancelled()) { - electionCancel.cancel(); - } - // Schedule an election. When the scheduler triggers an ElectionTimeout - // message is sent to itself - electionCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), - context.getActorSystem().dispatcher(), context.getActor()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 1bdcc8bdb4..d93271072c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,9 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** @@ -30,6 +30,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; public class Follower extends AbstractRaftActorBehavior { public Follower(RaftActorContext context) { super(context); + + scheduleElection(electionDuration()); } @Override protected RaftState handleAppendEntries(ActorRef sender, @@ -42,11 +44,6 @@ public class Follower extends AbstractRaftActorBehavior { return suggestedState; } - @Override protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { - return suggestedState; - } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { return suggestedState; @@ -57,6 +54,12 @@ public class Follower extends AbstractRaftActorBehavior { } @Override public RaftState handleMessage(ActorRef sender, Object message) { + if(message instanceof ElectionTimeout){ + return RaftState.Candidate; + } + + scheduleElection(electionDuration()); + return super.handleMessage(sender, message); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 5c37455be9..0498d7fc8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -19,7 +19,6 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -105,11 +104,6 @@ public class Leader extends AbstractRaftActorBehavior { return suggestedState; } - @Override protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { - return suggestedState; - } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { return suggestedState; @@ -127,11 +121,11 @@ public class Leader extends AbstractRaftActorBehavior { if (message instanceof SendHeartBeat) { for (ActorSelection follower : followerToActor.values()) { follower.tell(new AppendEntries( - context.getTermInformation().getCurrentTerm().get(), + context.getTermInformation().getCurrentTerm(), context.getId(), context.getReplicatedLog().last().getIndex(), context.getReplicatedLog().last().getTerm(), - Collections.EMPTY_LIST, context.getCommitIndex().get()), + Collections.EMPTY_LIST, context.getCommitIndex()), context.getActor()); } return state(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index c265388f6b..2a1cf9ab87 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -13,19 +13,20 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import java.util.concurrent.atomic.AtomicLong; - public class MockRaftActorContext implements RaftActorContext { private String id; private ActorSystem system; private ActorRef actor; - private AtomicLong index = new AtomicLong(0); - private AtomicLong lastApplied = new AtomicLong(0); + private long index = 0; + private long lastApplied = 0; private final ElectionTerm electionTerm; + private ReplicatedLog replicatedLog; public MockRaftActorContext(){ electionTerm = null; + + initReplicatedLog(); } public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ @@ -34,6 +35,16 @@ public class MockRaftActorContext implements RaftActorContext { this.actor = actor; electionTerm = new ElectionTermImpl(id); + + initReplicatedLog(); + } + + + public void initReplicatedLog(){ + MockReplicatedLog mockReplicatedLog = new MockReplicatedLog(); + this.replicatedLog = mockReplicatedLog; + mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,"")); + mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, "")); } @Override public ActorRef actorOf(Props props) { @@ -56,50 +67,84 @@ public class MockRaftActorContext implements RaftActorContext { return electionTerm; } - public void setIndex(AtomicLong index){ + public void setIndex(long index){ this.index = index; } - @Override public AtomicLong getCommitIndex() { + @Override public long getCommitIndex() { return index; } - public void setLastApplied(AtomicLong lastApplied){ + @Override public void setCommitIndex(long commitIndex) { + this.index = commitIndex; + } + + @Override public void setLastApplied(long lastApplied){ this.lastApplied = lastApplied; } - @Override public AtomicLong getLastApplied() { + @Override public long getLastApplied() { return lastApplied; } + public void setReplicatedLog(ReplicatedLog replicatedLog) { + this.replicatedLog = replicatedLog; + } + @Override public ReplicatedLog getReplicatedLog() { - return new ReplicatedLog(){ + return replicatedLog; + } - @Override public ReplicatedLogEntry getReplicatedLogEntry( - long index) { - throw new UnsupportedOperationException( - "getReplicatedLogEntry"); - } + @Override public ActorSystem getActorSystem() { + return this.system; + } + + + public static class MockReplicatedLog implements ReplicatedLog { + private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, ""); + private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, ""); - @Override public ReplicatedLogEntry last() { - return new ReplicatedLogEntry() { - @Override public Object getData() { - return null; - } + @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) { + return replicatedLogEntry; + } - @Override public long getTerm() { - return 1; - } + @Override public ReplicatedLogEntry last() { + return last; + } - @Override public long getIndex() { - return 1; - } - }; - } - }; + public void setReplicatedLogEntry( + ReplicatedLogEntry replicatedLogEntry) { + this.replicatedLogEntry = replicatedLogEntry; + } + + public void setLast(ReplicatedLogEntry last) { + this.last = last; + } } - @Override public ActorSystem getActorSystem() { - return this.system; + public static class MockReplicatedLogEntry implements ReplicatedLogEntry { + + private final long term; + private final long index; + private final Object data; + + public MockReplicatedLogEntry(long term, long index, Object data){ + + this.term = term; + this.index = index; + this.data = data; + } + + @Override public Object getData() { + return data; + } + + @Override public long getTerm() { + return term; + } + + @Override public long getIndex() { + return index; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index e6bf26cdcd..1e7aa6a1d5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -1,6 +1,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.Props; import akka.testkit.JavaTestKit; import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; @@ -12,12 +13,17 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{ + + private final ActorRef behaviorActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + @Test public void testHandlingOfRaftRPCWithNewerTerm() throws Exception { new JavaTestKit(getSystem()) {{ @@ -45,7 +51,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{ RaftActorContext context = createActorContext(); - ((MockRaftActorContext) context).setLastApplied(new AtomicLong(100)); + ((MockRaftActorContext) context).setLastApplied(100); AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101); @@ -53,8 +59,114 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{ RaftState raftState = createBehavior(context).handleMessage(getRef(), appendEntries); - assertEquals(new AtomicLong(101).get(), context.getLastApplied().get()); + assertEquals(new AtomicLong(101).get(), context.getLastApplied()); + + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorBehavior follower = createBehavior( + createActorContext(behaviorActor)); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext actorContext = + createActorContext(behaviorActor); + + MockRaftActorContext.MockReplicatedLog log = new MockRaftActorContext.MockReplicatedLog(); + log.setReplicatedLogEntry(new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, "")); + log.setLast( + new MockRaftActorContext.MockReplicatedLogEntry(20000, + 1000000, "")); + + ((MockRaftActorContext) actorContext).setReplicatedLog(log); + + RaftActorBehavior follower = createBehavior(actorContext); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + + + + @Test + public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(behaviorActor); + + context.getTermInformation().update(1000, null); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(999, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; }}; } @@ -76,6 +188,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{ return new MockRaftActorContext(); } + protected RaftActorContext createActorContext(ActorRef actor) { + return new MockRaftActorContext("test", getSystem(), actor); + } + protected AppendEntries createAppendEntriesWithNewerTerm(){ return new AppendEntries(100, "leader-1", 0, 0, null, 1); } @@ -92,4 +208,6 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{ return new RequestVoteReply(100, false); } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 9efdbd7c54..183c668fca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -40,11 +40,11 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Test public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){ RaftActorContext raftActorContext = createActorContext(); - long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get(); + long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm(); new Candidate(raftActorContext, Collections.EMPTY_LIST); - assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get()); + assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm()); assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor()); } @@ -190,6 +190,74 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { }}; } + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, null); + + // Once a candidate is created it will immediately increment the current term so after + // construction the currentTerm should be 1001 + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, "test"); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { @@ -200,8 +268,5 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { return new MockRaftActorContext("test", getSystem(), candidateActor); } - protected RaftActorContext createActorContext(ActorRef candidateActor) { - return new MockRaftActorContext("test", getSystem(), candidateActor); - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 5f4f3632ca..90acbb1eae 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -2,10 +2,19 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import static org.junit.Assert.assertEquals; + public class FollowerTest extends AbstractRaftActorBehaviorTest { private final ActorRef followerActor = getSystem().actorOf(Props.create( @@ -20,4 +29,108 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { return new MockRaftActorContext("test", getSystem(), followerActor); } + @Test + public void testThatAnElectionTimeoutIsTriggered(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Follower follower = new Follower(createActorContext(getTestActor())); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof ElectionTimeout) { + return true; + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleElectionTimeout(){ + RaftActorContext raftActorContext = createActorContext(); + Follower follower = + new Follower(raftActorContext); + + RaftState raftState = + follower.handleMessage(followerActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Candidate, raftState); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, null); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, "test"); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + }