package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* ElectionTerm contains information about a RaftActors election term.
* <p>
* latest term server has seen (initialized to 0
* on first boot, increases monotonically)
*/
- AtomicLong getCurrentTerm();
+ long getCurrentTerm();
/**
* candidateId that received vote in current
package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
public class ElectionTermImpl implements ElectionTerm{
/**
* Identifier of the actor whose election term information this is
*/
private final String id;
- private AtomicLong currentTerm;
+ private long currentTerm;
private String votedFor;
this.id = id;
// TODO: Read currentTerm from some persistent state
- currentTerm = new AtomicLong(0);
+ currentTerm = 0;
// TODO: Read votedFor from some file
votedFor = "";
}
- public AtomicLong getCurrentTerm() {
+ public long getCurrentTerm() {
return currentTerm;
}
}
public void update(long currentTerm, String votedFor){
- this.currentTerm.set(currentTerm);
+ this.currentTerm = currentTerm;
this.votedFor = votedFor;
// TODO : Write to some persistent state
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
id, new ElectionTermImpl(id),
- new AtomicLong(0), new AtomicLong(0), new ReplicatedLogImpl());
+ 0, 0, new ReplicatedLogImpl());
currentBehavior = switchBehavior(RaftState.Follower);
}
import akka.actor.ActorSystem;
import akka.actor.Props;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The RaftActorContext contains that portion of the RaftActors state that
* needs to be shared with it's behaviors. A RaftActorContext should NEVER be
* monotonically)
* @return
*/
- AtomicLong getCommitIndex();
+ long getCommitIndex();
+
+
+ /**
+ *
+ */
+ void setCommitIndex(long commitIndex);
/**
* index of highest log entry applied to state
* monotonically)
* @return
*/
- AtomicLong getLastApplied();
+ long getLastApplied();
+
+
+ /**
+ *
+ */
+ void setLastApplied(long lastApplied);
/**
* @return A representation of the log
import akka.actor.Props;
import akka.actor.UntypedActorContext;
-import java.util.concurrent.atomic.AtomicLong;
-
public class RaftActorContextImpl implements RaftActorContext{
private final ActorRef actor;
private final ElectionTerm termInformation;
- private final AtomicLong commitIndex;
+ private long commitIndex;
- private final AtomicLong lastApplied;
+ private long lastApplied;
private final ReplicatedLog replicatedLog;
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
- ElectionTerm termInformation, AtomicLong commitIndex,
- AtomicLong lastApplied, ReplicatedLog replicatedLog) {
+ ElectionTerm termInformation, long commitIndex,
+ long lastApplied, ReplicatedLog replicatedLog) {
this.actor = actor;
this.context = context;
this.id = id;
return termInformation;
}
- public AtomicLong getCommitIndex() {
+ public long getCommitIndex() {
return commitIndex;
}
- public AtomicLong getLastApplied() {
+ @Override public void setCommitIndex(long commitIndex) {
+ this.commitIndex = commitIndex;
+ }
+
+ public long getLastApplied() {
return lastApplied;
}
+ @Override public void setLastApplied(long lastApplied) {
+ this.lastApplied = lastApplied;
+ }
+
@Override public ReplicatedLog getReplicatedLog() {
return replicatedLog;
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* Abstract class that represents the behavior of a RaftActor
*/
protected final RaftActorContext context;
+ /**
+ * The maximum election time variance
+ */
+ private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+ /**
+ * The interval in which a new election would get triggered if no leader is found
+ */
+ private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+
+ /**
+ *
+ */
+
+ private Cancellable electionCancel = null;
+
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply, RaftState suggestedState);
- /**
- * Derived classes should not directly handle RequestVote messages it
- * should let the base class handle it first. Once the base class handles
- * the RequestVote message and does the common actions that are applicable
- * in all RaftState's it will delegate the handling of the RequestVote
- * message to the derived class to do more state specific handling by calling
- * this method
- *
- * @param sender The actor that sent this message
- * @param requestVote The RequestVote message
- * @param suggestedState The state that the RaftActor should be in based
- * on the base class's processing of the RequestVote
- * message
- * @return
- */
- protected abstract RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState);
+ protected RaftState handleRequestVote(ActorRef sender,
+ RequestVote requestVote, RaftState suggestedState){
+
+ boolean grantVote = false;
+
+ // Reply false if term < currentTerm (§5.1)
+ if(requestVote.getTerm() < currentTerm()){
+ grantVote = false;
+
+ // If votedFor is null or candidateId, and candidate’s log is at
+ // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+ } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
+
+ boolean candidateLatest = false;
+
+ // From §5.4.1
+ // Raft determines which of two logs is more up-to-date
+ // by comparing the index and term of the last entries in the
+ // logs. If the logs have last entries with different terms, then
+ // the log with the later term is more up-to-date. If the logs
+ // end with the same term, then whichever log is longer is
+ // more up-to-date.
+ if(requestVote.getLastLogTerm() > lastTerm()){
+ candidateLatest = true;
+ } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
+ candidateLatest = true;
+ }
+
+ if(candidateLatest) {
+ grantVote = true;
+ context.getTermInformation().update(requestVote.getTerm(),
+ requestVote.getCandidateId());
+ }
+ }
+
+ sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+
+ return suggestedState;
+ }
/**
* Derived classes should not directly handle RequestVoteReply messages it
*/
protected abstract RaftState state();
+ protected FiniteDuration electionDuration(){
+ long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+ return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+ }
+
+ protected void scheduleElection(FiniteDuration interval) {
+
+ if (electionCancel != null && !electionCancel.isCancelled()) {
+ electionCancel.cancel();
+ }
+
+ // Schedule an election. When the scheduler triggers an ElectionTimeout
+ // message is sent to itself
+ electionCancel =
+ context.getActorSystem().scheduler().scheduleOnce(interval,
+ context.getActor(), new ElectionTimeout(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+ protected long currentTerm(){
+ return context.getTermInformation().getCurrentTerm();
+ }
+
+ protected String votedFor(){
+ return context.getTermInformation().getVotedFor();
+ }
+
+ protected ActorRef actor(){
+ return context.getActor();
+ }
+
+ protected long lastTerm() {
+ return context.getReplicatedLog().last().getTerm();
+ }
+
+ protected long lastIndex() {
+ return context.getReplicatedLog().last().getIndex();
+ }
+
+
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
RaftState raftState = state();
}
if (message instanceof AppendEntries) {
AppendEntries appendEntries = (AppendEntries) message;
- if (appendEntries.getLeaderCommit() > context.getLastApplied()
- .get()) {
+ if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
raftState = handleAppendEntries(sender, appendEntries, raftState);
}
private RaftState applyTerm(RaftRPC rpc) {
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()
- .get()) {
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().update(rpc.getTerm(), null);
return RaftState.Follower;
}
}
private void applyLogToStateMachine(long index) {
- context.getLastApplied().set(index);
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ context.setLastApplied(index);
}
+
+
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
/**
* The behavior of a RaftActor when it is in the CandidateState
*/
public class Candidate extends AbstractRaftActorBehavior {
- /**
- * The maximum election time variance
- */
- private static final int ELECTION_TIME_MAX_VARIANCE = 100;
-
- /**
- * The interval in which a new election would get triggered if no leader is found
- */
- private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
-
- /**
- *
- */
private final Map<String, ActorSelection> peerToActor = new HashMap<>();
- private Cancellable electionCancel = null;
-
private int voteCount;
private final int votesRequired;
votesRequired = 0;
}
- scheduleElection(randomizedDuration());
+ startNewTerm();
+ scheduleElection(electionDuration());
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
// There is some peer who thinks it's a leader but is not
// I will not accept this append entries
sender.tell(new AppendEntriesReply(
- context.getTermInformation().getCurrentTerm().get(), false),
+ context.getTermInformation().getCurrentTerm(), false),
context.getActor());
return suggestedState;
return suggestedState;
}
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
-
- // We got this RequestVote because the term in there is less than
- // or equal to our current term, so do not grant the vote
- sender.tell(new RequestVoteReply(
- context.getTermInformation().getCurrentTerm().get(), false),
- context.getActor());
-
- return suggestedState;
- }
-
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
if(suggestedState == RaftState.Follower) {
// to send a message to the candidate
return RaftState.Leader;
}
- scheduleElection(randomizedDuration());
+ startNewTerm();
+ scheduleElection(electionDuration());
return state();
}
return super.handleMessage(sender, message);
}
- private FiniteDuration randomizedDuration(){
- long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
- return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
- }
-
- private void scheduleElection(FiniteDuration interval) {
+ private void startNewTerm(){
// set voteCount back to 1 (that is voting for self)
voteCount = 1;
// Increment the election term and vote for self
- AtomicLong currentTerm = context.getTermInformation().getCurrentTerm();
- context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId());
+ long currentTerm = context.getTermInformation().getCurrentTerm();
+ context.getTermInformation().update(currentTerm+1, context.getId());
// Request for a vote
for(ActorSelection peerActor : peerToActor.values()){
peerActor.tell(new RequestVote(
- context.getTermInformation().getCurrentTerm().get(),
- context.getId(), context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm()),
+ context.getTermInformation().getCurrentTerm(),
+ context.getId(), context.getReplicatedLog().last().getIndex(),
+ context.getReplicatedLog().last().getTerm()),
context.getActor());
}
- if (electionCancel != null && !electionCancel.isCancelled()) {
- electionCancel.cancel();
- }
- // Schedule an election. When the scheduler triggers an ElectionTimeout
- // message is sent to itself
- electionCancel =
- context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
- context.getActorSystem().dispatcher(), context.getActor());
}
}
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
public class Follower extends AbstractRaftActorBehavior {
public Follower(RaftActorContext context) {
super(context);
+
+ scheduleElection(electionDuration());
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
return suggestedState;
}
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
- return suggestedState;
- }
-
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
return suggestedState;
}
@Override public RaftState handleMessage(ActorRef sender, Object message) {
+ if(message instanceof ElectionTimeout){
+ return RaftState.Candidate;
+ }
+
+ scheduleElection(electionDuration());
+
return super.handleMessage(sender, message);
}
}
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
return suggestedState;
}
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
- return suggestedState;
- }
-
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
return suggestedState;
if (message instanceof SendHeartBeat) {
for (ActorSelection follower : followerToActor.values()) {
follower.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get(),
+ context.getTermInformation().getCurrentTerm(),
context.getId(),
context.getReplicatedLog().last().getIndex(),
context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex().get()),
+ Collections.EMPTY_LIST, context.getCommitIndex()),
context.getActor());
}
return state();
import akka.actor.ActorSystem;
import akka.actor.Props;
-import java.util.concurrent.atomic.AtomicLong;
-
public class MockRaftActorContext implements RaftActorContext {
private String id;
private ActorSystem system;
private ActorRef actor;
- private AtomicLong index = new AtomicLong(0);
- private AtomicLong lastApplied = new AtomicLong(0);
+ private long index = 0;
+ private long lastApplied = 0;
private final ElectionTerm electionTerm;
+ private ReplicatedLog replicatedLog;
public MockRaftActorContext(){
electionTerm = null;
+
+ initReplicatedLog();
}
public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
this.actor = actor;
electionTerm = new ElectionTermImpl(id);
+
+ initReplicatedLog();
+ }
+
+
+ public void initReplicatedLog(){
+ MockReplicatedLog mockReplicatedLog = new MockReplicatedLog();
+ this.replicatedLog = mockReplicatedLog;
+ mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,""));
+ mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, ""));
}
@Override public ActorRef actorOf(Props props) {
return electionTerm;
}
- public void setIndex(AtomicLong index){
+ public void setIndex(long index){
this.index = index;
}
- @Override public AtomicLong getCommitIndex() {
+ @Override public long getCommitIndex() {
return index;
}
- public void setLastApplied(AtomicLong lastApplied){
+ @Override public void setCommitIndex(long commitIndex) {
+ this.index = commitIndex;
+ }
+
+ @Override public void setLastApplied(long lastApplied){
this.lastApplied = lastApplied;
}
- @Override public AtomicLong getLastApplied() {
+ @Override public long getLastApplied() {
return lastApplied;
}
+ public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ this.replicatedLog = replicatedLog;
+ }
+
@Override public ReplicatedLog getReplicatedLog() {
- return new ReplicatedLog(){
+ return replicatedLog;
+ }
- @Override public ReplicatedLogEntry getReplicatedLogEntry(
- long index) {
- throw new UnsupportedOperationException(
- "getReplicatedLogEntry");
- }
+ @Override public ActorSystem getActorSystem() {
+ return this.system;
+ }
+
+
+ public static class MockReplicatedLog implements ReplicatedLog {
+ private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
+ private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
- @Override public ReplicatedLogEntry last() {
- return new ReplicatedLogEntry() {
- @Override public Object getData() {
- return null;
- }
+ @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) {
+ return replicatedLogEntry;
+ }
- @Override public long getTerm() {
- return 1;
- }
+ @Override public ReplicatedLogEntry last() {
+ return last;
+ }
- @Override public long getIndex() {
- return 1;
- }
- };
- }
- };
+ public void setReplicatedLogEntry(
+ ReplicatedLogEntry replicatedLogEntry) {
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ public void setLast(ReplicatedLogEntry last) {
+ this.last = last;
+ }
}
- @Override public ActorSystem getActorSystem() {
- return this.system;
+ public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
+
+ private final long term;
+ private final long index;
+ private final Object data;
+
+ public MockReplicatedLogEntry(long term, long index, Object data){
+
+ this.term = term;
+ this.index = index;
+ this.data = data;
+ }
+
+ @Override public Object getData() {
+ return data;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getIndex() {
+ return index;
+ }
}
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
+
+ private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
@Test
public void testHandlingOfRaftRPCWithNewerTerm() throws Exception {
new JavaTestKit(getSystem()) {{
RaftActorContext context =
createActorContext();
- ((MockRaftActorContext) context).setLastApplied(new AtomicLong(100));
+ ((MockRaftActorContext) context).setLastApplied(100);
AppendEntries appendEntries =
new AppendEntries(100, "leader-1", 0, 0, null, 101);
RaftState raftState =
createBehavior(context).handleMessage(getRef(), appendEntries);
- assertEquals(new AtomicLong(101).get(), context.getLastApplied().get());
+ assertEquals(new AtomicLong(101).get(), context.getLastApplied());
+
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorBehavior follower = createBehavior(
+ createActorContext(behaviorActor));
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext actorContext =
+ createActorContext(behaviorActor);
+
+ MockRaftActorContext.MockReplicatedLog log = new MockRaftActorContext.MockReplicatedLog();
+ log.setReplicatedLogEntry(new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, ""));
+ log.setLast(
+ new MockRaftActorContext.MockReplicatedLogEntry(20000,
+ 1000000, ""));
+
+ ((MockRaftActorContext) actorContext).setReplicatedLog(log);
+
+ RaftActorBehavior follower = createBehavior(actorContext);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+
+
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(behaviorActor);
+
+ context.getTermInformation().update(1000, null);
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(999, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
}};
}
return new MockRaftActorContext();
}
+ protected RaftActorContext createActorContext(ActorRef actor) {
+ return new MockRaftActorContext("test", getSystem(), actor);
+ }
+
protected AppendEntries createAppendEntriesWithNewerTerm(){
return new AppendEntries(100, "leader-1", 0, 0, null, 1);
}
return new RequestVoteReply(100, false);
}
+
+
}
@Test
public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
RaftActorContext raftActorContext = createActorContext();
- long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get();
+ long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
new Candidate(raftActorContext, Collections.EMPTY_LIST);
- assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get());
+ assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
}
}};
}
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, null);
+
+ // Once a candidate is created it will immediately increment the current term so after
+ // construction the currentTerm should be 1001
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, "test");
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
return new MockRaftActorContext("test", getSystem(), candidateActor);
}
- protected RaftActorContext createActorContext(ActorRef candidateActor) {
- return new MockRaftActorContext("test", getSystem(), candidateActor);
- }
}
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import static org.junit.Assert.assertEquals;
+
public class FollowerTest extends AbstractRaftActorBehaviorTest {
private final ActorRef followerActor = getSystem().actorOf(Props.create(
return new MockRaftActorContext("test", getSystem(), followerActor);
}
+ @Test
+ public void testThatAnElectionTimeoutIsTriggered(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Follower follower = new Follower(createActorContext(getTestActor()));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof ElectionTimeout) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleElectionTimeout(){
+ RaftActorContext raftActorContext = createActorContext();
+ Follower follower =
+ new Follower(raftActorContext);
+
+ RaftState raftState =
+ follower.handleMessage(followerActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Candidate, raftState);
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, null);
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, "test");
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
}