+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).index());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for (int i = 0; i < 5; i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect only 1 message to be sent because of two reasons,
+ // - an append entries reply was not received
+ // - the heartbeat interval has not expired
+ // In this scenario if multiple messages are sent they would likely be duplicates
+ assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+ }
+
+ @Test
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
+ logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for (int i = 0; i < 3; i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
+ }
+
+ // We are expecting six messages here -- a request to replicate and a consensus-reached message
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+ for (int i = 0; i < 3; i++) {
+ assertRequestEntry(lastIndex, allMessages, i);
+ assertCommitEntry(lastIndex, allMessages, i);
+ }
+
+ // Now perform another commit, eliciting a request to persist
+ sendReplicate(actorContext, lastIndex + 3 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // This elicits another message for request to replicate
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ assertRequestEntry(lastIndex, allMessages, 3);
+
+ sendReplicate(actorContext, lastIndex + 4 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ }
+
+ private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+ assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+ assertEquals(List.of(), commitReq.getEntries());
+ }
+
+ private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries req = allMessages.get(2 * messageNr);
+ assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+ final List<ReplicatedLogEntry> entries = req.getEntries();
+ assertEquals(1, entries.size());
+ assertEquals(messageNr + 2, entries.get(0).index());
+ }
+
+ @Test
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
+ logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ sendReplicate(actorContext, lastIndex + 1);
+
+ // Wait slightly longer than heartbeat duration
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(1, allMessages.get(0).getEntries().size());
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ // FIXME: weird assert
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index());
+
+ }
+
+ @Test
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
+ logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for (int i = 0; i < 3; i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+ }
+
+ @Test
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ sendReplicate(actorContext, lastIndex + 1);
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());