import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ return sendReplicate(actorContext, 1, index);
+ }
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
- 1, index, payload);
+ term, index, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
}
assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
}
+ @Test
+ public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+ logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ // The raft context is initialized with a couple log entries. However the commitIndex
+ // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+ // committed and applied. Now it regains leadership with a higher term (2).
+ long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+ long newTerm = prevTerm + 1;
+ actorContext.getTermInformation().update(newTerm, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower replies with the leader's current last index and term, simulating that it is
+ // up to date with the leader.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+ // The commit index should not get updated even though consensus was reached. This is b/c the
+ // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+ // from previous terms by counting replicas".
+ assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+ followerActor.underlyingActor().clear();
+
+ // Now replicate a new entry with the new term 2.
+ long newIndex = lastIndex + 1;
+ sendReplicate(actorContext, newTerm, newIndex);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+ // The follower replies with success. The leader should now update the commit index to the new index
+ // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+ // prior entries are committed indirectly".
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+ assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+ }
+
@Test
public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
// Send reply from the voting follower and verify consensus.
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInSync() {
+ logStart("testTransferLeadershipWithFollowerInSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithEmptyLog() {
+ logStart("testTransferLeadershipWithEmptyLog");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+ logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Sync up the follower.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerSyncTimeout() {
+ logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Send heartbeats to time out the transfer.
+ for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ verify(mockTransferCohort).abortTransfer();
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
@Override