2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.Arrays;
14 import java.util.List;
16 import org.junit.Assert;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
20 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
29 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
30 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
31 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
34 * Tests replication and snapshots end-to-end using real RaftActors and behavior communication with a
37 * @author Thomas Pantelis
39 public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
41 private void setup() {
42 leaderId = factory.generateActorId("leader");
43 follower1Id = factory.generateActorId("follower");
44 follower2Id = factory.generateActorId("follower");
46 // Setup the persistent journal for the leader - just an election term and no journal/snapshots.
47 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
49 // Create the leader and 2 follower actors.
50 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
51 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
53 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
54 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
56 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
57 put(follower1Id, follower1Actor.path().toString()).
58 put(follower2Id, follower2Actor.path().toString()).build();
60 leaderConfigParams = newLeaderConfigParams();
61 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
63 waitUntilLeader(leaderActor);
65 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
66 leader = leaderActor.underlyingActor().getCurrentBehavior();
68 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
69 follower1 = follower1Actor.underlyingActor().getCurrentBehavior();
71 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
72 follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
74 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
75 assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
77 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
78 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
79 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
81 testLog.info("Leader created and elected");
85 * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
86 * caught up via AppendEntries.
89 public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
90 testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads");
94 // Simulate lagging by dropping AppendEntries messages in follower 2.
95 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
98 MockPayload payload0 = sendPayloadData(leaderActor, "zero");
99 MockPayload payload1 = sendPayloadData(leaderActor, "one");
101 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
102 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
103 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0);
104 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
106 // Verify follower 1 applies each log entry.
107 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
108 verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
109 verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
111 // Ensure there's at least 1 more heartbeat.
112 MessageCollectorActor.clearMessages(leaderCollectorActor);
113 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
115 // The leader should not have performed fake snapshots to trim the log because the entries have not
116 // been replicated to follower 2.
117 assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm());
118 assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
119 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
120 assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex());
121 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
122 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
123 assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
125 testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}", follower2Id);
127 // Now stop dropping AppendEntries in follower 2.
128 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
130 // Verify follower 2 applies each log entry.
131 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2);
132 verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
133 verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
135 // Ensure there's at least 1 more heartbeat.
136 MessageCollectorActor.clearMessages(leaderCollectorActor);
137 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
139 // The leader should now have performed fake snapshots to trim the log.
140 verifyLeadersTrimmedLog(1);
142 // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot
143 // to catch it up because no snapshotting was done so the follower's next index was present in the log.
144 InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
145 InstallSnapshot.class);
146 Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
148 testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
152 * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
153 * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
154 * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
155 * sent by the leader.
160 public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
161 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting");
165 sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
167 // Configure follower 2 to drop messages and lag.
168 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
170 // Send the first payload and verify it gets applied by the leader and follower 1.
171 MockPayload payload2 = sendPayloadData(leaderActor, "two");
173 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
174 verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
176 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
177 verifyApplyState(applyState, null, null, currentTerm, 2, payload2);
179 expSnapshotState.add(payload2);
181 MessageCollectorActor.clearMessages(leaderCollectorActor);
182 MessageCollectorActor.clearMessages(follower1CollectorActor);
184 // Send another payload - this should cause a snapshot due to snapshotBatchCount reached.
185 MockPayload payload3 = sendPayloadData(leaderActor, "three");
187 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
189 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads");
191 // Send 2 more payloads - not enough to trigger another snapshot.
192 MockPayload payload4 = sendPayloadData(leaderActor, "four");
193 MockPayload payload5 = sendPayloadData(leaderActor, "five");
195 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
196 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
197 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
198 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
199 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
201 // Verify follower 1 applies each log entry.
202 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
203 verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3);
204 verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4);
205 verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5);
207 // The snapshot should have caused the leader to advanced the snapshot index to the
208 // last previously applied index (1) that was replicated to all followers at the time of capture.
209 // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not
210 // have trimmed the log to the last index actually applied (5).
211 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
212 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
213 assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size());
214 assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
215 assertEquals("Leader commit index", 5, leaderContext.getCommitIndex());
216 assertEquals("Leader last applied", 5, leaderContext.getLastApplied());
217 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
219 // Now stop dropping AppendEntries in follower 2.
220 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
222 // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c
223 // follower 2's next index (3) is still present in the log.
224 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
225 verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
226 verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3);
227 verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
228 verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
230 // Verify the leader did not try to install a snapshot to catch up follower 2.
231 InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class);
232 Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
234 // Ensure there's at least 1 more heartbeat.
235 MessageCollectorActor.clearMessages(leaderCollectorActor);
236 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
238 // The leader should now have performed fake snapshots to advance the snapshot index and to trim
239 // the log. In addition replicatedToAllIndex should've advanced.
240 verifyLeadersTrimmedLog(5);
242 // Verify the leader's persisted snapshot.
243 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
244 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
245 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3);
246 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
247 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
248 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
250 // Verify follower 1's log and snapshot indexes.
251 MessageCollectorActor.clearMessages(follower1CollectorActor);
252 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
253 verifyFollowersTrimmedLog(1, follower1Actor, 5);
255 // Verify follower 2's log and snapshot indexes.
256 MessageCollectorActor.clearMessages(follower2CollectorActor);
257 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
258 verifyFollowersTrimmedLog(2, follower2Actor, 5);
260 MessageCollectorActor.clearMessages(leaderCollectorActor);
261 MessageCollectorActor.clearMessages(follower1CollectorActor);
262 MessageCollectorActor.clearMessages(follower2CollectorActor);
264 expSnapshotState.add(payload3);
265 expSnapshotState.add(payload4);
266 expSnapshotState.add(payload5);
268 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete");
272 * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
273 * lagging where the leader trims its log from the last applied index. Follower 2's log
274 * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
275 * installed by the leader.
280 public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
281 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
285 sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
287 // Configure follower 2 to drop messages and lag.
288 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
290 // Send 5 payloads - the second should cause a leader snapshot.
291 MockPayload payload2 = sendPayloadData(leaderActor, "two");
292 MockPayload payload3 = sendPayloadData(leaderActor, "three");
293 MockPayload payload4 = sendPayloadData(leaderActor, "four");
294 MockPayload payload5 = sendPayloadData(leaderActor, "five");
295 MockPayload payload6 = sendPayloadData(leaderActor, "six");
297 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
299 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
300 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
301 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
302 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
303 verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
305 MessageCollectorActor.clearMessages(leaderCollectorActor);
307 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
309 // Send another payload to trigger a second leader snapshot.
310 MockPayload payload7 = sendPayloadData(leaderActor, "seven");
312 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
314 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
315 verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
317 // Verify follower 1 applies each log entry.
318 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
319 verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
320 verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
321 verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
323 // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
324 // applied index (6) since the log size should have exceed the snapshot batch count (4).
325 // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
326 verifyLeadersTrimmedLog(7, 1);
328 expSnapshotState.add(payload2);
329 expSnapshotState.add(payload3);
330 expSnapshotState.add(payload4);
331 expSnapshotState.add(payload5);
332 expSnapshotState.add(payload6);
334 // Verify the leader's persisted snapshot.
335 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
336 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
337 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
338 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
339 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
340 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
342 expSnapshotState.add(payload7);
344 verifyInstallSnapshotToLaggingFollower(7);
346 testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
350 * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
351 * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
352 * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
358 public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
359 testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
361 snapshotBatchCount = 5;
364 sendInitialPayloadsReplicatedToAllFollowers("zero");
366 leaderActor.underlyingActor().setMockTotalMemory(1000);
368 // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
369 InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
371 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
373 // Send a payload with a large relative size but not enough to trigger a snapshot.
374 MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
376 // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond.
377 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1);
378 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
380 // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal
381 // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets
382 // purged from the snapshot store after subsequent snapshots.
383 InMemoryJournal.waitForWriteMessagesComplete(leaderId);
385 // Verify a snapshot is not triggered.
386 CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
387 Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
389 expSnapshotState.add(payload1);
391 // Send another payload with a large enough relative size in combination with the last payload
392 // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
393 MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
395 // Verify the leader applies the last log entry.
396 applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
397 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
399 // Verify follower 1 applies each log entry.
400 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
401 verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1);
402 verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2);
404 // A snapshot should've occurred - wait for it to complete.
405 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
407 // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced
408 // the snapshot index to the last applied index and trimmed the log even though the entries weren't
409 // replicated to all followers.
410 verifyLeadersTrimmedLog(2, 0);
412 // Verify the leader's persisted snapshot.
413 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
414 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
415 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2);
416 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
417 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
418 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2);
420 expSnapshotState.add(payload2);
422 verifyInstallSnapshotToLaggingFollower(2L);
424 // Sends a payload with index 3.
425 verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
427 // Sends 3 payloads with indexes 4, 5 and 6.
428 verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
430 // Recover the leader from persistence and verify.
431 long leadersLastIndexOnRecovery = 6;
433 // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3.
434 long leadersSnapshotIndexOnRecovery = 3;
436 // The recovered journal should have 3 entries starting at index 4.
437 long leadersFirstJournalEntryIndexOnRecovery = 4;
439 verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
440 leadersFirstJournalEntryIndexOnRecovery);
442 testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending");
446 * Send another payload to verify another snapshot is not done since the last snapshot trimmed the
447 * first log entry so the memory threshold should not be exceeded.
451 private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() throws Exception {
452 ApplyState applyState;
453 CaptureSnapshot captureSnapshot;
455 MockPayload payload3 = sendPayloadData(leaderActor, "three");
457 // Verify the leader applies the state.
458 applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
459 verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
461 captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
462 Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
464 // Verify the follower 1 applies the state.
465 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
466 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
468 // Verify the follower 2 applies the state.
469 applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
470 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
472 // Verify the leader's state.
473 verifyLeadersTrimmedLog(3);
475 // Verify follower 1's state.
476 verifyFollowersTrimmedLog(1, follower1Actor, 3);
478 // Verify follower 2's state.
479 verifyFollowersTrimmedLog(2, follower2Actor, 3);
481 // Revert back to JVM total memory.
482 leaderActor.underlyingActor().setMockTotalMemory(0);
484 MessageCollectorActor.clearMessages(leaderCollectorActor);
485 MessageCollectorActor.clearMessages(follower1CollectorActor);
486 MessageCollectorActor.clearMessages(follower2CollectorActor);
488 expSnapshotState.add(payload3);
492 * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
496 private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
497 List<Snapshot> persistedSnapshots;
498 List<ReplicatedLogEntry> unAppliedEntry;
499 ApplySnapshot applySnapshot;
500 InstallSnapshot installSnapshot;
501 InstallSnapshotReply installSnapshotReply;
503 testLog.info("testInstallSnapshotToLaggingFollower starting");
505 // Now stop dropping AppendEntries in follower 2.
506 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
508 installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
509 assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
510 assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
511 assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
512 assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
513 assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
514 assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
515 //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
517 installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
518 assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
519 assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
520 assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
521 assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
523 // Verify follower 2 applies the snapshot.
524 applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
525 verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
526 assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
528 // Wait for the snapshot to complete.
529 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
531 // Ensure there's at least 1 more heartbeat.
532 MessageCollectorActor.clearMessages(leaderCollectorActor);
533 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
535 // The leader should now have performed fake snapshots to advance the snapshot index and to trim
536 // the log. In addition replicatedToAllIndex should've advanced.
537 verifyLeadersTrimmedLog(lastAppliedIndex);
539 // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
540 // the snapshot store because the second snapshot was initiated by the follower install snapshot and
541 // not because the batch count was reached so the persisted journal sequence number wasn't advanced
542 // far enough to cause the previous snapshot to be deleted. This is because
543 // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
544 // This is OK - the next snapshot should delete it. In production, even if the system restarted
545 // before another snapshot, they would both get applied which wouldn't hurt anything.
546 persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
547 Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
548 Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
549 verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
550 unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
551 assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
553 MessageCollectorActor.clearMessages(leaderCollectorActor);
554 MessageCollectorActor.clearMessages(follower1CollectorActor);
555 MessageCollectorActor.clearMessages(follower2CollectorActor);
557 testLog.info("testInstallSnapshotToLaggingFollower complete");
561 * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
562 * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
566 private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
567 List<ApplyState> applyStates;
568 ApplyState applyState;
570 testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
571 leader.getReplicatedToAllIndex());
573 // Send another payload - a snapshot should occur.
574 MockPayload payload4 = sendPayloadData(leaderActor, "four");
576 // Wait for the snapshot to complete.
577 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
579 applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
580 verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
582 // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
583 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
584 Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
585 verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
586 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
587 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
588 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
590 // Send a couple more payloads.
591 MockPayload payload5 = sendPayloadData(leaderActor, "five");
592 MockPayload payload6 = sendPayloadData(leaderActor, "six");
594 // Verify the leader applies the 2 log entries.
595 applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
596 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
597 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
599 // Verify the leader applies a log entry for at least the last entry index.
600 verifyApplyJournalEntries(leaderCollectorActor, 6);
602 // Ensure there's at least 1 more heartbeat to trim the log.
603 MessageCollectorActor.clearMessages(leaderCollectorActor);
604 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
606 // Verify the leader's final state.
607 verifyLeadersTrimmedLog(6);
609 InMemoryJournal.dumpJournal(leaderId);
611 // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
612 // added after the snapshot as the persisted journal should've been purged to the snapshot
614 verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(5, currentTerm, payload5),
615 new ReplicatedLogImplEntry(6, currentTerm, payload6)));
617 // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
618 List<ApplyJournalEntries> persistedApplyJournalEntries = InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
619 boolean found = false;
620 for(ApplyJournalEntries entry: persistedApplyJournalEntries) {
621 if(entry.getToIndex() == 6) {
627 Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6), found);
629 // Verify follower 1 applies the 3 log entries.
630 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
631 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
632 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
633 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
635 // Verify follower 1's log state.
636 verifyFollowersTrimmedLog(1, follower1Actor, 6);
638 // Verify follower 2 applies the 3 log entries.
639 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
640 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
641 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
642 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
644 // Verify follower 2's log state.
645 verifyFollowersTrimmedLog(2, follower2Actor, 6);
647 expSnapshotState.add(payload4);
648 expSnapshotState.add(payload5);
649 expSnapshotState.add(payload6);
651 testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending");
655 * Kill the leader actor, reinstate it and verify the recovered journal.
657 private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) {
658 testLog.info("testLeaderReinstatement starting");
660 killActor(leaderActor);
662 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
663 TestRaftActor testRaftActor = leaderActor.underlyingActor();
665 testRaftActor.startDropMessages(RequestVoteReply.class);
667 leaderContext = testRaftActor.getRaftActorContext();
669 testRaftActor.waitForRecoveryComplete();
671 int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex);
672 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
673 assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex());
674 assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size());
675 assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex());
676 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
677 assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
679 for(long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
680 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
681 expSnapshotState.get((int) i));
684 assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
686 testLog.info("testLeaderReinstatement ending");
689 private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
691 // Send the payloads.
692 for(String d: data) {
693 expSnapshotState.add(sendPayloadData(leaderActor, d));
696 int nEntries = data.length;
698 // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
699 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, nEntries);
700 for(int i = 0; i < expSnapshotState.size(); i++) {
701 MockPayload payload = expSnapshotState.get(i);
702 verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
705 // Verify follower 1 applies each log entry.
706 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, nEntries);
707 for(int i = 0; i < expSnapshotState.size(); i++) {
708 MockPayload payload = expSnapshotState.get(i);
709 verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
712 // Verify follower 2 applies each log entry.
713 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, nEntries);
714 for(int i = 0; i < expSnapshotState.size(); i++) {
715 MockPayload payload = expSnapshotState.get(i);
716 verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
719 // Ensure there's at least 1 more heartbeat.
720 MessageCollectorActor.clearMessages(leaderCollectorActor);
721 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
723 // The leader should have performed fake snapshots to trim the log to the last index replicated to
725 verifyLeadersTrimmedLog(nEntries - 1);
727 MessageCollectorActor.clearMessages(leaderCollectorActor);
728 MessageCollectorActor.clearMessages(follower1CollectorActor);
729 MessageCollectorActor.clearMessages(follower2CollectorActor);