+ @Test
+ public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
+
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+ // Configure follower 2 to drop messages and lag.
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ // Send 5 payloads - the second should cause a leader snapshot.
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ MockPayload payload5 = sendPayloadData(leaderActor, "five");
+ MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+ verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+
+ // Send another payload to trigger a second leader snapshot.
+ MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+ // Verify follower 1 applies each log entry.
+ applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+ verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+ // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
+ // applied index (6) since the log size should have exceed the snapshot batch count (4).
+ // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
+ verifyLeadersTrimmedLog(7, 1);
+
+ expSnapshotState.add(payload2);
+ expSnapshotState.add(payload3);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
+
+ // Verify the leader's persisted snapshot.
+ List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
+ List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
+ verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
+
+ expSnapshotState.add(payload7);
+
+ verifyInstallSnapshotToLaggingFollower(7);
+
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
+ }
+
+ /**
+ * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
+ * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
+ * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
+ * by the leader.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
+ testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
+
+ snapshotBatchCount = 5;
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero");