+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
+ }
+
+ @Test
+ public void testFollowerToSnapshotLogic() {
+ logStart("testFollowerToSnapshotLogic");
+
+ MockRaftActorContext actorContext = createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+
+ leader = new Leader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ ByteString bs = toByteString(leadersSnapshot);
+ byte[] barray = bs.toByteArray();
+
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+
+ assertEquals(bs.size(), barray.length);
+
+ int chunkIndex=0;
+ for (int i=0; i < barray.length; i = i + 50) {
+ int j = i + 50;
+ chunkIndex++;
+
+ if (i + 50 > barray.length) {
+ j = barray.length;
+ }
+
+ byte[] chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
+ assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
+
+ fts.markSendStatus(true);
+ if (!fts.isLastChunk(chunkIndex)) {
+ fts.incrementChunkIndex();
+ }
+ }
+
+ assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+ }
+
+ @Override
+ protected Leader createBehavior(final RaftActorContext actorContext) {
+ return new Leader(actorContext);
+ }
+
+ @Override
+ protected MockRaftActorContext createActorContext() {
+ return createActorContext(leaderActor);
+ }
+
+ @Override
+ protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ return createActorContext(LEADER_ID, actorRef);
+ }
+
+ private MockRaftActorContext createActorContextWithFollower() {
+ MockRaftActorContext actorContext = createActorContext();
+ actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+ followerActor.path().toString()).build());
+ return actorContext;
+ }
+
+ private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ context.setPayloadVersion(payloadVersion);
+ return context;
+ }
+
+ private MockRaftActorContext createFollowerActorContextWithLeader() {
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+ followerConfig.setElectionTimeoutFactor(10000);
+ followerActorContext.setConfigParams(followerConfig);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ return followerActorContext;
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ follower.close();
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(leaderPeerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().setBehavior(follower);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(2, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ assertEquals(2, followerActorContext.getCommitIndex());
+
+ follower.close();
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ long leaderCommitIndex = 2;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(0);
+ followerActorContext.setLastApplied(0);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTerm(){
+ logStart("testHandleAppendEntriesReplyWithNewerTerm");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+ logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ logStart("testHandleAppendEntriesReplySuccess");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ leader = new Leader(leaderActorContext);
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
+ assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+ assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+
+ short payloadVersion = 5;
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyJournalEntries.class);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyJournalEntries.getToIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ assertEquals(2, followerInfo.getMatchIndex());
+ assertEquals(3, followerInfo.getNextIndex());
+ assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+ assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ logStart("testHandleAppendEntriesReplyUnknownFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+ logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+ long leaderCommitIndex = 3;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+ MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+ assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ logStart("testHandleRequestVoteReply");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ // Should be a no-op.
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+ new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ logStart("testIsolatedLeaderCheckNoFollowers");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue(behavior instanceof Leader);
+ }
+
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+ ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
+ ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setRaftPolicy(raftPolicy);
+
+ leader = new Leader(leaderActorContext);
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowers");
+
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
+
+ assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
+
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+
+ assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+ behavior instanceof Leader);
+ }
+
+ @Test
+ public void testLaggingFollowerStarvation() throws Exception {
+ logStart("testLaggingFollowerStarvation");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ for(int i=1;i<6;i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
+
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
+
+ }};
+ }
+
+ @Test
+ public void testReplicationConsensusWithNonVotingFollower() {
+ logStart("testReplicationConsensusWithNonVotingFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ String nonVotingFollowerId = "nonvoting-follower";
+ TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Ignore initial heartbeats
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send a Replicate message and wait for AppendEntries.
+ sendReplicate(leaderActorContext, 0);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ // Send reply only from the voting follower and verify consensus via ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send another Replicate message
+ sendReplicate(leaderActorContext, 1);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+ AppendEntries.class);
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+ // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+ // Send reply from the voting follower and verify consensus.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInSync() {
+ logStart("testTransferLeadershipWithFollowerInSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithEmptyLog() {
+ logStart("testTransferLeadershipWithEmptyLog");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+ logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Sync up the follower.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerSyncTimeout() {
+ logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Send heartbeats to time out the transfer.
+ for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ }
+
+ verify(mockTransferCohort).abortTransfer();
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+ }
+
+ @Override
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+ super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+ assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+ }
+
+ private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+ private final long electionTimeOutIntervalMillis;
+ private final int snapshotChunkSize;
+
+ public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ super();
+ this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+ }