import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.ElectionTerm;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContextImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class CandidateTest extends AbstractRaftActorBehaviorTest {
+public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
+ static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class);
private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("candidate"));
@Override
@After
- public void tearDown() throws Exception {
- if(candidate != null) {
+ public void tearDown() {
+ if (candidate != null) {
candidate.close();
}
}
@Test
- public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
+ public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself() {
RaftActorContext raftActorContext = createActorContext();
long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
candidate = new Candidate(raftActorContext);
- assertEquals("getCurrentTerm", expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
+ assertEquals("getCurrentTerm", expectedTerm + 1, raftActorContext.getTermInformation().getCurrentTerm());
assertEquals("getVotedFor", raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
}
@Test
- public void testThatAnElectionTimeoutIsTriggered(){
- MockRaftActorContext actorContext = createActorContext();
- candidate = new Candidate(actorContext);
+ public void testThatAnElectionTimeoutIsTriggered() {
+ MockRaftActorContext actorContext = createActorContext();
+ candidate = new Candidate(actorContext);
- MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class,
- actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
+ MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class,
+ actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
}
@Test
- public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
+ public void testHandleElectionTimeoutWhenThereAreZeroPeers() {
RaftActorContext raftActorContext = createActorContext();
candidate = new Candidate(raftActorContext);
RaftActorBehavior newBehavior =
- candidate.handleMessage(candidateActor, new ElectionTimeout());
+ candidate.handleMessage(candidateActor, ElectionTimeout.INSTANCE);
assertEquals("Behavior", RaftState.Leader, newBehavior.state());
}
@Test
- public void testHandleElectionTimeoutWhenThereAreTwoNodeCluster(){
+ public void testHandleElectionTimeoutWhenThereAreTwoNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
raftActorContext.setPeerAddresses(setupPeers(1));
candidate = new Candidate(raftActorContext);
- candidate = candidate.handleMessage(candidateActor, new ElectionTimeout());
+ candidate = candidate.handleMessage(candidateActor, ElectionTimeout.INSTANCE);
assertEquals("Behavior", RaftState.Candidate, candidate.state());
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
+ public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
+ raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
raftActorContext.setPeerAddresses(setupPeers(2));
candidate = new Candidate(raftActorContext);
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
+ public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster() {
+ MockRaftActorContext raftActorContext = createActorContext();
+ raftActorContext.setLastApplied(-1);
+ raftActorContext.setPeerAddresses(setupPeers(2));
+ candidate = new Candidate(raftActorContext);
+
+ candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, true));
+
+ // LastApplied is -1 and behind the last index.
+ assertEquals("Behavior", RaftState.PreLeader, candidate.state());
+ }
+
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
+ raftActorContext.getTermInformation().update(2L, "other");
+ raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder()
+ .createEntries(0, 5, 1).build());
+ raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex());
+ raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
raftActorContext.setPeerAddresses(setupPeers(4));
candidate = new Candidate(raftActorContext);
+ RequestVote requestVote = MessageCollectorActor.expectFirstMatching(peerActors[0], RequestVote.class);
+ assertEquals("getTerm", 3L, requestVote.getTerm());
+ assertEquals("getCandidateId", raftActorContext.getId(), requestVote.getCandidateId());
+ assertEquals("getLastLogTerm", 1L, requestVote.getLastLogTerm());
+ assertEquals("getLastLogIndex", 4L, requestVote.getLastLogIndex());
+
+ MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[3], RequestVote.class);
+
// First peers denies the vote.
candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, false));
assertEquals("Behavior", RaftState.Leader, candidate.state());
}
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers() {
+ ElectionTerm mockElectionTerm = Mockito.mock(ElectionTerm.class);
+ Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
+ RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
+ "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
+ new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
+ raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
+ raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
+ candidate = new Candidate(raftActorContext);
+
+ MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+ MessageCollectorActor.assertNoneMatching(peerActors[0], RequestVote.class, 300);
+ MessageCollectorActor.assertNoneMatching(peerActors[3], RequestVote.class, 100);
+
+ candidate = candidate.handleMessage(peerActors[1], new RequestVoteReply(1, false));
+
+ assertEquals("Behavior", RaftState.Candidate, candidate.state());
+
+ candidate = candidate.handleMessage(peerActors[2], new RequestVoteReply(1, true));
+
+ assertEquals("Behavior", RaftState.Leader, candidate.state());
+ }
+
@Test
public void testResponseToHandleAppendEntriesWithLowerTerm() {
candidate = new Candidate(createActorContext());
}
@Test
- public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
MockRaftActorContext context = createActorContext();
Stopwatch stopwatch = Stopwatch.createStarted();
@Test
@Override
- public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+ public void testHandleAppendEntriesAddSameEntryToLog() {
MockRaftActorContext context = createActorContext();
context.getTermInformation().update(2, "test");
setLastLogEntry(context, 2, 0, payload);
List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
+ entries.add(new SimpleReplicatedLogEntry(0, 2, payload));
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
+ final AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
behavior = createBehavior(context);
context.getTermInformation().update(2, "test");
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftActorBehavior expected = behavior.handleMessage(candidateActor, "unknown");
+ behavior.handleMessage(candidateActor, "unknown");
RaftActorBehavior raftBehavior = behavior.handleMessage(candidateActor, appendEntries);
}
@Override
- protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ protected Candidate createBehavior(final RaftActorContext actorContext) {
return new Candidate(actorContext);
}
- @Override protected MockRaftActorContext createActorContext() {
+ @Override
+ protected MockRaftActorContext createActorContext() {
return new MockRaftActorContext("candidate", getSystem(), candidateActor);
}
- private Map<String, String> setupPeers(int count) {
+ @SuppressWarnings("unchecked")
+ private Map<String, String> setupPeers(final int count) {
Map<String, String> peerMap = new HashMap<>();
peerActors = new TestActorRef[count];
- for(int i = 0; i < count; i++) {
+ for (int i = 0; i < count; i++) {
peerActors[i] = actorFactory.createTestActor(Props.create(MessageCollectorActor.class),
actorFactory.generateActorId("peer"));
- peerMap.put("peer" + (i+1), peerActors[i].path().toString());
+ peerMap.put("peer" + (i + 1), peerActors[i].path().toString());
}
return peerMap;
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
- assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+ if (rpc instanceof RequestVote) {
+ assertEquals("New votedFor", ((RequestVote)rpc).getCandidateId(),
+ actorContext.getTermInformation().getVotedFor());
+ } else {
+ assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+ }
}
}