+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for (int i = 0; i < 3; i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+ }
+
+ @Test
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ sendReplicate(actorContext, lastIndex + 1);
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ }
+
+
+ @Test
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+ logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
+
+ MockRaftActorContext actorContext = createActorContext();
+
+ leader = new Leader(actorContext);
+
+ actorContext.setLastApplied(0);
+
+ long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+ long term = actorContext.getTermInformation().getCurrentTerm();
+ ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
+ newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
+
+ actorContext.getReplicatedLog().append(newEntry);
+
+ final Identifier id = new MockIdentifier("state-id");
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
+
+ // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
+ // one since lastApplied state is 0.
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
+ leaderActor, ApplyState.class);
+ assertEquals("ApplyState count", newLogIndex, applyStateList.size());
+
+ for (int i = 0; i <= newLogIndex - 1; i++ ) {
+ ApplyState applyState = applyStateList.get(i);
+ assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+ }
+
+ ApplyState last = applyStateList.get((int) newLogIndex - 1);
+ assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+ assertEquals("getIdentifier", id, last.getIdentifier());
+ }
+
+ @Test
+ public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+ logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
+
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(commitIndex);
+ //set follower timeout to 2 mins, helps during debugging
+ actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
+
+ leader = new Leader(actorContext);
+
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ fts.setSnapshotBytes(bs);
+ leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
+
+ //send first chunk and no InstallSnapshotReply received yet
+ fts.getNextChunk();
+ fts.incrementChunkIndex();
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+
+ //InstallSnapshotReply received
+ fts.markSendStatus(true);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertEquals(commitIndex, is.getLastIncludedIndex());
+ }
+
+ @Test
+ public void testSendAppendEntriesSnapshotScenario() throws Exception {
+ logStart("testSendAppendEntriesSnapshotScenario");
+
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // new entry
+ SimpleReplicatedLogEntry entry =
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ actorContext.getReplicatedLog().append(entry);
+
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());