2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
13 import akka.actor.ActorRef;
14 import akka.persistence.SaveSnapshotSuccess;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.Arrays;
18 import java.util.HashSet;
19 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nullable;
24 import org.apache.commons.lang3.SerializationUtils;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
31 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
37 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
39 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
40 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
42 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
44 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 * Tests replication and snapshots end-to-end using real RaftActors and behavior communication with a
51 * @author Thomas Pantelis
53 public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
55 private void setup() {
56 leaderId = factory.generateActorId("leader");
57 follower1Id = factory.generateActorId("follower");
58 follower2Id = factory.generateActorId("follower");
60 // Setup the persistent journal for the leader - just an election term and no journal/snapshots.
61 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
63 // Create the leader and 2 follower actors.
64 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
65 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
67 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
68 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
70 Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
71 .put(follower1Id, follower1Actor.path().toString())
72 .put(follower2Id, follower2Actor.path().toString()).build();
74 leaderConfigParams = newLeaderConfigParams();
75 leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
77 waitUntilLeader(leaderActor);
79 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
80 leader = leaderActor.underlyingActor().getCurrentBehavior();
82 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
83 follower1 = follower1Actor.underlyingActor().getCurrentBehavior();
85 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
86 follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
88 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
89 assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
91 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
92 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
93 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
95 testLog.info("Leader created and elected");
99 * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
100 * caught up via AppendEntries.
103 public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() {
104 testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads");
108 // Simulate lagging by dropping AppendEntries messages in follower 2.
109 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
111 // Send the payloads.
112 MockPayload payload0 = sendPayloadData(leaderActor, "zero");
113 MockPayload payload1 = sendPayloadData(leaderActor, "one");
115 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
116 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
117 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0);
118 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
120 // Verify follower 1 applies each log entry.
121 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
122 verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
123 verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
125 // Ensure there's at least 1 more heartbeat.
126 MessageCollectorActor.clearMessages(leaderCollectorActor);
127 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
129 // The leader should not have performed fake snapshots to trim the log because the entries have not
130 // been replicated to follower 2.
131 assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm());
132 assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
133 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
134 assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex());
135 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
136 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
137 assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
140 "testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}",
143 // Now stop dropping AppendEntries in follower 2.
144 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
146 // Verify follower 2 applies each log entry.
147 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2);
148 verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
149 verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
151 // Ensure there's at least 1 more heartbeat.
152 MessageCollectorActor.clearMessages(leaderCollectorActor);
153 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
155 // The leader should now have performed fake snapshots to trim the log.
156 verifyLeadersTrimmedLog(1);
158 // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot
159 // to catch it up because no snapshotting was done so the follower's next index was present in the log.
160 InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
161 InstallSnapshot.class);
162 Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
164 testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
168 * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
169 * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
170 * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
171 * sent by the leader.
174 public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() {
175 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting");
179 sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
181 // Configure follower 2 to drop messages and lag.
182 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
184 // Send the first payload and verify it gets applied by the leader and follower 1.
185 MockPayload payload2 = sendPayloadData(leaderActor, "two");
187 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
188 verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
190 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
191 verifyApplyState(applyState, null, null, currentTerm, 2, payload2);
193 expSnapshotState.add(payload2);
195 MessageCollectorActor.clearMessages(leaderCollectorActor);
196 MessageCollectorActor.clearMessages(follower1CollectorActor);
198 // Send another payload - this should cause a snapshot due to snapshotBatchCount reached.
199 MockPayload payload3 = sendPayloadData(leaderActor, "three");
201 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
203 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads");
205 // Send 2 more payloads - not enough to trigger another snapshot.
206 MockPayload payload4 = sendPayloadData(leaderActor, "four");
207 MockPayload payload5 = sendPayloadData(leaderActor, "five");
209 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
210 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
211 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
212 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
213 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
215 // Verify follower 1 applies each log entry.
216 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
217 verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3);
218 verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4);
219 verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5);
221 // The snapshot should have caused the leader to advanced the snapshot index to the
222 // last previously applied index (1) that was replicated to all followers at the time of capture.
223 // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not
224 // have trimmed the log to the last index actually applied (5).
225 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
226 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
227 assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size());
228 assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
229 assertEquals("Leader commit index", 5, leaderContext.getCommitIndex());
230 assertEquals("Leader last applied", 5, leaderContext.getLastApplied());
231 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
233 // Now stop dropping AppendEntries in follower 2.
234 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
236 // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c
237 // follower 2's next index (3) is still present in the log.
238 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
239 verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
240 verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3);
241 verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
242 verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
244 // Verify the leader did not try to install a snapshot to catch up follower 2.
245 InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
246 InstallSnapshot.class);
247 Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
249 // Ensure there's at least 1 more heartbeat.
250 MessageCollectorActor.clearMessages(leaderCollectorActor);
251 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
253 // The leader should now have performed fake snapshots to advance the snapshot index and to trim
254 // the log. In addition replicatedToAllIndex should've advanced.
255 verifyLeadersTrimmedLog(5);
257 // Verify the leader's persisted snapshot.
258 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
259 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
260 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3);
261 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
262 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
263 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
265 // Verify follower 1's log and snapshot indexes.
266 MessageCollectorActor.clearMessages(follower1CollectorActor);
267 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
268 verifyFollowersTrimmedLog(1, follower1Actor, 5);
270 // Verify follower 2's log and snapshot indexes.
271 MessageCollectorActor.clearMessages(follower2CollectorActor);
272 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
273 verifyFollowersTrimmedLog(2, follower2Actor, 5);
275 MessageCollectorActor.clearMessages(leaderCollectorActor);
276 MessageCollectorActor.clearMessages(follower1CollectorActor);
277 MessageCollectorActor.clearMessages(follower2CollectorActor);
279 expSnapshotState.add(payload3);
280 expSnapshotState.add(payload4);
281 expSnapshotState.add(payload5);
283 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete");
287 * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
288 * lagging where the leader trims its log from the last applied index. Follower 2's log
289 * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
290 * installed by the leader.
293 public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() {
294 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
298 sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
300 // Configure follower 2 to drop messages and lag.
301 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
303 // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
304 Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
305 TimeUnit.MILLISECONDS);
307 // Send 5 payloads - the second should cause a leader snapshot.
308 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
309 final MockPayload payload3 = sendPayloadData(leaderActor, "three");
310 final MockPayload payload4 = sendPayloadData(leaderActor, "four");
311 final MockPayload payload5 = sendPayloadData(leaderActor, "five");
312 final MockPayload payload6 = sendPayloadData(leaderActor, "six");
314 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
316 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
317 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
318 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
319 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
320 verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
322 MessageCollectorActor.clearMessages(leaderCollectorActor);
324 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
325 + "sending 1 more payload to trigger second snapshot");
327 // Send another payload to trigger a second leader snapshot.
328 MockPayload payload7 = sendPayloadData(leaderActor, "seven");
330 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
332 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
333 verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
335 // Verify follower 1 applies each log entry.
336 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
337 verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
338 verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
339 verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
341 // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
342 // applied index (6) since the log size should have exceed the snapshot batch count (4).
343 // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
344 verifyLeadersTrimmedLog(7, 1);
346 expSnapshotState.add(payload2);
347 expSnapshotState.add(payload3);
348 expSnapshotState.add(payload4);
349 expSnapshotState.add(payload5);
350 expSnapshotState.add(payload6);
352 MessageCollectorActor.clearMessages(leaderCollectorActor);
353 MessageCollectorActor.clearMessages(follower1CollectorActor);
355 // Send a server config change to test that the install snapshot includes the server config.
357 ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
358 new ServerInfo(leaderId, true),
359 new ServerInfo(follower1Id, false),
360 new ServerInfo(follower2Id, false)));
361 leaderContext.updatePeerIds(serverConfig);
362 ((AbstractLeader)leader).updateMinReplicaCount();
363 leaderActor.tell(serverConfig, ActorRef.noSender());
365 applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
366 verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
368 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
369 verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
371 // Verify the leader's persisted snapshot.
372 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
373 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
374 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
375 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
376 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
377 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
379 expSnapshotState.add(payload7);
381 verifyInstallSnapshotToLaggingFollower(8, serverConfig);
383 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
387 * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
388 * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
389 * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
393 public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() {
394 testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
396 snapshotBatchCount = 5;
399 sendInitialPayloadsReplicatedToAllFollowers("zero");
401 leaderActor.underlyingActor().setMockTotalMemory(1000);
403 // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
404 InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
406 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
408 // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
409 Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
410 TimeUnit.MILLISECONDS);
412 // Send a payload with a large relative size but not enough to trigger a snapshot.
413 MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
415 // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond.
416 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1);
417 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
419 // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal
420 // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets
421 // purged from the snapshot store after subsequent snapshots.
422 InMemoryJournal.waitForWriteMessagesComplete(leaderId);
424 // Verify a snapshot is not triggered.
425 CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
426 CaptureSnapshot.class);
427 Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
429 expSnapshotState.add(payload1);
431 // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
432 Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
433 TimeUnit.MILLISECONDS);
435 // Send another payload with a large enough relative size in combination with the last payload
436 // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
437 MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
439 // Verify the leader applies the last log entry.
440 applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
441 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
443 // Verify follower 1 applies each log entry.
444 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
445 verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1);
446 verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2);
448 // A snapshot should've occurred - wait for it to complete.
449 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
451 // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced
452 // the snapshot index to the last applied index and trimmed the log even though the entries weren't
453 // replicated to all followers.
454 verifyLeadersTrimmedLog(2, 0);
456 // Verify the leader's persisted snapshot.
457 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
458 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
459 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2);
460 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
461 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
462 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2);
464 expSnapshotState.add(payload2);
466 verifyInstallSnapshotToLaggingFollower(2L, null);
468 // Sends a payload with index 3.
469 verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
471 // Sends 3 payloads with indexes 4, 5 and 6.
472 long leadersSnapshotIndexOnRecovery = verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
474 // Recover the leader from persistence and verify.
475 long leadersLastIndexOnRecovery = 6;
477 long leadersFirstJournalEntryIndexOnRecovery = leadersSnapshotIndexOnRecovery + 1;
479 verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
480 leadersFirstJournalEntryIndexOnRecovery);
482 testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending");
486 * Send another payload to verify another snapshot is not done since the last snapshot trimmed the
487 * first log entry so the memory threshold should not be exceeded.
489 private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() {
490 ApplyState applyState;
491 CaptureSnapshot captureSnapshot;
493 MockPayload payload3 = sendPayloadData(leaderActor, "three");
495 // Verify the leader applies the state.
496 applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
497 verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
499 captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
500 Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
502 // Verify the follower 1 applies the state.
503 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
504 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
506 // Verify the follower 2 applies the state.
507 applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
508 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
510 // Verify the leader's state.
511 verifyLeadersTrimmedLog(3);
513 // Verify follower 1's state.
514 verifyFollowersTrimmedLog(1, follower1Actor, 3);
516 // Verify follower 2's state.
517 verifyFollowersTrimmedLog(2, follower2Actor, 3);
519 // Revert back to JVM total memory.
520 leaderActor.underlyingActor().setMockTotalMemory(0);
522 MessageCollectorActor.clearMessages(leaderCollectorActor);
523 MessageCollectorActor.clearMessages(follower1CollectorActor);
524 MessageCollectorActor.clearMessages(follower2CollectorActor);
526 expSnapshotState.add(payload3);
530 * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
532 private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
533 @Nullable ServerConfigurationPayload expServerConfig) {
534 testLog.info("verifyInstallSnapshotToLaggingFollower starting");
536 MessageCollectorActor.clearMessages(leaderCollectorActor);
538 // Now stop dropping AppendEntries in follower 2.
539 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
542 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
544 // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
545 // the snapshot store because the second snapshot was initiated by the follower install snapshot and
546 // not because the batch count was reached so the persisted journal sequence number wasn't advanced
547 // far enough to cause the previous snapshot to be deleted. This is because
548 // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
549 // This is OK - the next snapshot should delete it. In production, even if the system restarted
550 // before another snapshot, they would both get applied which wouldn't hurt anything.
551 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
552 Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
553 Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
554 verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
555 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
556 assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
558 int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
559 final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
560 + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
562 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
563 InstallSnapshot.class);
564 assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
565 assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
566 assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
567 assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
568 assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
569 assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
570 //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
572 List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
573 leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
575 for (InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
576 assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
577 assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
578 assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
579 assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
582 // Verify follower 2 applies the snapshot.
583 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
584 ApplySnapshot.class);
585 verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm,
587 assertEquals("Persisted Snapshot getUnAppliedEntries size", 0,
588 applySnapshot.getSnapshot().getUnAppliedEntries().size());
590 // Wait for the snapshot to complete.
591 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
593 // Ensure there's at least 1 more heartbeat.
594 MessageCollectorActor.clearMessages(leaderCollectorActor);
595 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
597 // The leader should now have performed fake snapshots to advance the snapshot index and to trim
598 // the log. In addition replicatedToAllIndex should've advanced.
599 verifyLeadersTrimmedLog(lastAppliedIndex);
601 if (expServerConfig != null) {
602 Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
603 assertEquals("Leader snapshot server config", expServerInfo,
604 new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
606 assertEquals("Follower 2 snapshot server config", expServerInfo,
607 new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
609 ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
610 assertNotNull("Follower 2 server config is null", follower2ServerConfig);
612 assertEquals("Follower 2 server config", expServerInfo,
613 new HashSet<>(follower2ServerConfig.getServerConfig()));
616 MessageCollectorActor.clearMessages(leaderCollectorActor);
617 MessageCollectorActor.clearMessages(follower1CollectorActor);
618 MessageCollectorActor.clearMessages(follower2CollectorActor);
620 testLog.info("verifyInstallSnapshotToLaggingFollower complete");
624 * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
625 * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
627 private long verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() {
629 "verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot starting: replicatedToAllIndex: {}",
630 leader.getReplicatedToAllIndex());
632 // Send another payload - a snapshot should occur.
633 MockPayload payload4 = sendPayloadData(leaderActor, "four");
635 // Wait for the snapshot to complete.
636 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
638 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
639 verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
641 // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
642 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
643 Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
644 // The last (fourth) payload may or may not have been applied when the snapshot is captured depending on the
645 // timing when the async persistence completes.
646 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
647 long leadersSnapshotIndex;
648 if (unAppliedEntry.isEmpty()) {
649 leadersSnapshotIndex = 4;
650 expSnapshotState.add(payload4);
651 verifySnapshot("Persisted", persistedSnapshot, currentTerm, 4, currentTerm, 4);
653 leadersSnapshotIndex = 3;
654 verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
655 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
656 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
657 expSnapshotState.add(payload4);
660 // Send a couple more payloads.
661 MockPayload payload5 = sendPayloadData(leaderActor, "five");
662 MockPayload payload6 = sendPayloadData(leaderActor, "six");
664 // Verify the leader applies the 2 log entries.
665 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
666 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
667 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
669 // Verify the leader applies a log entry for at least the last entry index.
670 verifyApplyJournalEntries(leaderCollectorActor, 6);
672 // Ensure there's at least 1 more heartbeat to trim the log.
673 MessageCollectorActor.clearMessages(leaderCollectorActor);
674 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
676 // Verify the leader's final state.
677 verifyLeadersTrimmedLog(6);
679 InMemoryJournal.dumpJournal(leaderId);
681 // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
682 // added after the snapshot as the persisted journal should've been purged to the snapshot
684 verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
685 new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
687 // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
688 List<ApplyJournalEntries> persistedApplyJournalEntries =
689 InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
690 boolean found = false;
691 for (ApplyJournalEntries entry: persistedApplyJournalEntries) {
692 if (entry.getToIndex() == 6) {
698 Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6),
701 // Verify follower 1 applies the 3 log entries.
702 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
703 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
704 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
705 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
707 // Verify follower 1's log state.
708 verifyFollowersTrimmedLog(1, follower1Actor, 6);
710 // Verify follower 2 applies the 3 log entries.
711 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
712 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
713 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
714 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
716 // Verify follower 2's log state.
717 verifyFollowersTrimmedLog(2, follower2Actor, 6);
719 expSnapshotState.add(payload5);
720 expSnapshotState.add(payload6);
722 testLog.info("verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot ending");
724 return leadersSnapshotIndex;
728 * Kill the leader actor, reinstate it and verify the recovered journal.
730 private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
731 long firstJournalEntryIndex) {
732 testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
733 + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
735 killActor(leaderActor);
737 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
738 TestRaftActor testRaftActor = leaderActor.underlyingActor();
740 testRaftActor.startDropMessages(RequestVoteReply.class);
742 leaderContext = testRaftActor.getRaftActorContext();
744 testRaftActor.waitForRecoveryComplete();
746 int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex);
747 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
748 assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex());
749 assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size());
750 assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex());
751 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
752 assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
754 for (long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
755 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
756 expSnapshotState.get((int) i));
759 assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
761 testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
764 private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
766 // Send the payloads.
767 for (String d: data) {
768 expSnapshotState.add(sendPayloadData(leaderActor, d));
771 int numEntries = data.length;
773 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
774 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
775 ApplyState.class, numEntries);
776 for (int i = 0; i < expSnapshotState.size(); i++) {
777 MockPayload payload = expSnapshotState.get(i);
778 verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
781 // Verify follower 1 applies each log entry.
782 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, numEntries);
783 for (int i = 0; i < expSnapshotState.size(); i++) {
784 MockPayload payload = expSnapshotState.get(i);
785 verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
788 // Verify follower 2 applies each log entry.
789 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, numEntries);
790 for (int i = 0; i < expSnapshotState.size(); i++) {
791 MockPayload payload = expSnapshotState.get(i);
792 verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
795 // Ensure there's at least 1 more heartbeat.
796 MessageCollectorActor.clearMessages(leaderCollectorActor);
797 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
799 // The leader should have performed fake snapshots to trim the log to the last index replicated to
801 verifyLeadersTrimmedLog(numEntries - 1);
803 MessageCollectorActor.clearMessages(leaderCollectorActor);
804 MessageCollectorActor.clearMessages(follower1CollectorActor);
805 MessageCollectorActor.clearMessages(follower2CollectorActor);