+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().term());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().term());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).term());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).term());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().term());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().term());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTerm() {
+ logStart("testHandleAppendEntriesReplyWithNewerTerm");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
+ logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());