+ @Test
+ public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() {
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
+
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+ // Configure follower 2 to drop messages and lag.
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
+ // Send 5 payloads - the second should cause a leader snapshot.
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+ final MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+ verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
+ + "sending 1 more payload to trigger second snapshot");
+
+ // Send another payload to trigger a second leader snapshot.
+ MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+ // Verify follower 1 applies each log entry.
+ applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+ verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+ // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
+ // applied index (6) since the log size should have exceed the snapshot batch count (4).
+ // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
+ verifyLeadersTrimmedLog(7, 1);
+
+ expSnapshotState.add(payload2);
+ expSnapshotState.add(payload3);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+ MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+ // Send a server config change to test that the install snapshot includes the server config.
+
+ ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(leaderId, true),
+ new ServerInfo(follower1Id, false),
+ new ServerInfo(follower2Id, false)));
+ leaderContext.updatePeerIds(serverConfig);
+ ((AbstractLeader)leader).updateMinReplicaCount();
+ leaderActor.tell(serverConfig, ActorRef.noSender());
+
+ applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+ applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+ verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
+ // Verify the leader's persisted snapshot.
+ List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
+ List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
+ verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
+
+ expSnapshotState.add(payload7);
+
+ verifyInstallSnapshotToLaggingFollower(8, serverConfig);
+
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
+ }
+
+ /**
+ * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
+ * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
+ * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
+ * by the leader.
+ */
+ @Test
+ public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() {
+ testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
+
+ snapshotBatchCount = 5;
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero");