+ @Test
+ public void testReplicationConsensusWithNonVotingFollower() {
+ logStart("testReplicationConsensusWithNonVotingFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ String nonVotingFollowerId = "nonvoting-follower";
+ TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+
+ // Ignore initial heartbeats
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send a Replicate message and wait for AppendEntries.
+ sendReplicate(leaderActorContext, 0);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ // Send reply only from the voting follower and verify consensus via ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send another Replicate message
+ sendReplicate(leaderActorContext, 1);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+ AppendEntries.class);
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+ // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+ // Send reply from the voting follower and verify consensus.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInSync() {
+ logStart("testTransferLeadershipWithFollowerInSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithEmptyLog() {
+ logStart("testTransferLeadershipWithEmptyLog");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+ logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Sync up the follower.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerSyncTimeout() {
+ logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Send heartbeats to time out the transfer.
+ for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ verify(mockTransferCohort).abortTransfer();
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+ }
+