Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.util.concurrent.Uninterruptibles;
16 import java.util.Arrays;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nullable;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
28 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
29 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
35 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
39 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
40 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
41 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
42
43 /**
44  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication with a
45  * lagging follower.
46  *
47  * @author Thomas Pantelis
48  */
49 public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
50
51     private void setup() {
52         leaderId = factory.generateActorId("leader");
53         follower1Id = factory.generateActorId("follower");
54         follower2Id = factory.generateActorId("follower");
55
56         // Setup the persistent journal for the leader - just an election term and no journal/snapshots.
57         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
58
59         // Create the leader and 2 follower actors.
60         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
61                 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
62
63         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
64                 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
65
66         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
67                 put(follower1Id, follower1Actor.path().toString()).
68                 put(follower2Id, follower2Actor.path().toString()).build();
69
70         leaderConfigParams = newLeaderConfigParams();
71         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
72
73         waitUntilLeader(leaderActor);
74
75         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
76         leader = leaderActor.underlyingActor().getCurrentBehavior();
77
78         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
79         follower1 = follower1Actor.underlyingActor().getCurrentBehavior();
80
81         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
82         follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
83
84         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
85         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
86
87         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
88         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
89         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
90
91         testLog.info("Leader created and elected");
92     }
93
94     /**
95      * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
96      * caught up via AppendEntries.
97      */
98     @Test
99     public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
100         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads");
101
102         setup();
103
104         // Simulate lagging by dropping AppendEntries messages in follower 2.
105         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
106
107         // Send the payloads.
108         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
109         MockPayload payload1 = sendPayloadData(leaderActor, "one");
110
111         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
112         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
113         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0);
114         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
115
116         // Verify follower 1 applies each log entry.
117         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
118         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
119         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
120
121         // Ensure there's at least 1 more heartbeat.
122         MessageCollectorActor.clearMessages(leaderCollectorActor);
123         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
124
125         // The leader should not have performed fake snapshots to trim the log because the entries have not
126         // been replicated to follower 2.
127         assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm());
128         assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
129         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
130         assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex());
131         assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
132         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
133         assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
134
135         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}", follower2Id);
136
137         // Now stop dropping AppendEntries in follower 2.
138         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
139
140         // Verify follower 2 applies each log entry.
141         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2);
142         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
143         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
144
145         // Ensure there's at least 1 more heartbeat.
146         MessageCollectorActor.clearMessages(leaderCollectorActor);
147         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
148
149         // The leader should now have performed fake snapshots to trim the log.
150         verifyLeadersTrimmedLog(1);
151
152         // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot
153         // to catch it up because no snapshotting was done so the follower's next index was present in the log.
154         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
155                 InstallSnapshot.class);
156         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
157
158         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
159     }
160
161     /**
162      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
163      * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
164      * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
165      * sent by the leader.
166      *
167      * @throws Exception
168      */
169     @Test
170     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
171         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting");
172
173         setup();
174
175         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
176
177         // Configure follower 2 to drop messages and lag.
178         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
179
180         // Send the first payload and verify it gets applied by the leader and follower 1.
181         MockPayload payload2 = sendPayloadData(leaderActor, "two");
182
183         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
184         verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
185
186         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
187         verifyApplyState(applyState, null, null, currentTerm, 2, payload2);
188
189         expSnapshotState.add(payload2);
190
191         MessageCollectorActor.clearMessages(leaderCollectorActor);
192         MessageCollectorActor.clearMessages(follower1CollectorActor);
193
194         // Send another payload - this should cause a snapshot due to snapshotBatchCount reached.
195         MockPayload payload3 = sendPayloadData(leaderActor, "three");
196
197         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
198
199         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads");
200
201         // Send 2 more payloads - not enough to trigger another snapshot.
202         MockPayload payload4 = sendPayloadData(leaderActor, "four");
203         MockPayload payload5 = sendPayloadData(leaderActor, "five");
204
205         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
206         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
207         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
208         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
209         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
210
211         // Verify follower 1 applies each log entry.
212         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
213         verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3);
214         verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4);
215         verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5);
216
217         // The snapshot should have caused the leader to advanced the snapshot index to the
218         // last previously applied index (1) that was replicated to all followers at the time of capture.
219         // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not
220         // have trimmed the log to the last index actually applied (5).
221         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
222         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
223         assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size());
224         assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
225         assertEquals("Leader commit index", 5, leaderContext.getCommitIndex());
226         assertEquals("Leader last applied", 5, leaderContext.getLastApplied());
227         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
228
229         // Now stop dropping AppendEntries in follower 2.
230         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
231
232         // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c
233         // follower 2's next index (3) is still present in the log.
234         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
235         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
236         verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3);
237         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
238         verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
239
240         // Verify the leader did not try to install a snapshot to catch up follower 2.
241         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class);
242         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
243
244         // Ensure there's at least 1 more heartbeat.
245         MessageCollectorActor.clearMessages(leaderCollectorActor);
246         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
247
248         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
249         // the log. In addition replicatedToAllIndex should've advanced.
250         verifyLeadersTrimmedLog(5);
251
252         // Verify the leader's persisted snapshot.
253         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
254         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
255         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3);
256         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
257         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
258         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
259
260         // Verify follower 1's log and snapshot indexes.
261         MessageCollectorActor.clearMessages(follower1CollectorActor);
262         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
263         verifyFollowersTrimmedLog(1, follower1Actor, 5);
264
265         // Verify follower 2's log and snapshot indexes.
266         MessageCollectorActor.clearMessages(follower2CollectorActor);
267         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
268         verifyFollowersTrimmedLog(2, follower2Actor, 5);
269
270         MessageCollectorActor.clearMessages(leaderCollectorActor);
271         MessageCollectorActor.clearMessages(follower1CollectorActor);
272         MessageCollectorActor.clearMessages(follower2CollectorActor);
273
274         expSnapshotState.add(payload3);
275         expSnapshotState.add(payload4);
276         expSnapshotState.add(payload5);
277
278         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete");
279     }
280
281     /**
282      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
283      * lagging where the leader trims its log from the last applied index. Follower 2's log
284      * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
285      * installed by the leader.
286      *
287      * @throws Exception
288      */
289     @Test
290     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
291         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
292
293         setup();
294
295         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
296
297         // Configure follower 2 to drop messages and lag.
298         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
299
300         // Send 5 payloads - the second should cause a leader snapshot.
301         MockPayload payload2 = sendPayloadData(leaderActor, "two");
302         MockPayload payload3 = sendPayloadData(leaderActor, "three");
303         MockPayload payload4 = sendPayloadData(leaderActor, "four");
304         MockPayload payload5 = sendPayloadData(leaderActor, "five");
305         MockPayload payload6 = sendPayloadData(leaderActor, "six");
306
307         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
308
309         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
310         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
311         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
312         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
313         verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
314
315         MessageCollectorActor.clearMessages(leaderCollectorActor);
316
317         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
318
319         // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
320         Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
321                 TimeUnit.MILLISECONDS);
322
323         // Send another payload to trigger a second leader snapshot.
324         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
325
326         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
327
328         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
329         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
330
331         // Verify follower 1 applies each log entry.
332         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
333         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
334         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
335         verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
336
337         // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
338         // applied index (6) since the log size should have exceed the snapshot batch count (4).
339         // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
340         verifyLeadersTrimmedLog(7, 1);
341
342         expSnapshotState.add(payload2);
343         expSnapshotState.add(payload3);
344         expSnapshotState.add(payload4);
345         expSnapshotState.add(payload5);
346         expSnapshotState.add(payload6);
347
348         MessageCollectorActor.clearMessages(leaderCollectorActor);
349         MessageCollectorActor.clearMessages(follower1CollectorActor);
350
351         // Send a server config change to test that the install snapshot includes the server config.
352
353         ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
354                 new ServerInfo(leaderId, true),
355                 new ServerInfo(follower1Id, false),
356                 new ServerInfo(follower2Id, false)));
357         leaderContext.updatePeerIds(serverConfig);
358         ((AbstractLeader)leader).updateMinReplicaCount();
359         leaderActor.tell(serverConfig, ActorRef.noSender());
360
361         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
362         verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
363
364         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
365         verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
366
367         // Verify the leader's persisted snapshot.
368         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
369         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
370         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
371         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
372         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
373         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
374
375         expSnapshotState.add(payload7);
376
377         verifyInstallSnapshotToLaggingFollower(8, serverConfig);
378
379         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
380     }
381
382     /**
383      * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
384      * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
385      * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
386      * by the leader.
387      *
388      * @throws Exception
389      */
390     @Test
391     public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
392         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
393
394         snapshotBatchCount = 5;
395         setup();
396
397         sendInitialPayloadsReplicatedToAllFollowers("zero");
398
399         leaderActor.underlyingActor().setMockTotalMemory(1000);
400
401         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
402         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
403
404         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
405
406         // Send a payload with a large relative size but not enough to trigger a snapshot.
407         MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
408
409         // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond.
410         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1);
411         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
412
413         // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal
414         // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets
415         // purged from the snapshot store after subsequent snapshots.
416         InMemoryJournal.waitForWriteMessagesComplete(leaderId);
417
418         // Verify a snapshot is not triggered.
419         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
420         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
421
422         expSnapshotState.add(payload1);
423
424         // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
425         Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
426                 TimeUnit.MILLISECONDS);
427
428         // Send another payload with a large enough relative size in combination with the last payload
429         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
430         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
431
432         // Verify the leader applies the last log entry.
433         applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
434         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
435
436         // Verify follower 1 applies each log entry.
437         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
438         verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1);
439         verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2);
440
441         // A snapshot should've occurred - wait for it to complete.
442         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
443
444         // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced
445         // the snapshot index to the last applied index and trimmed the log even though the entries weren't
446         // replicated to all followers.
447         verifyLeadersTrimmedLog(2, 0);
448
449         // Verify the leader's persisted snapshot.
450         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
451         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
452         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2);
453         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
454         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
455         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2);
456
457         expSnapshotState.add(payload2);
458
459         verifyInstallSnapshotToLaggingFollower(2L, null);
460
461         // Sends a payload with index 3.
462         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
463
464         // Sends 3 payloads with indexes 4, 5 and 6.
465         verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
466
467         // Recover the leader from persistence and verify.
468         long leadersLastIndexOnRecovery = 6;
469
470         // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3.
471         long leadersSnapshotIndexOnRecovery = 3;
472
473         // The recovered journal should have 3 entries starting at index 4.
474         long leadersFirstJournalEntryIndexOnRecovery = 4;
475
476         verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
477                 leadersFirstJournalEntryIndexOnRecovery);
478
479         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending");
480     }
481
482     /**
483      * Send another payload to verify another snapshot is not done since the last snapshot trimmed the
484      * first log entry so the memory threshold should not be exceeded.
485      *
486      * @throws Exception
487      */
488     private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() throws Exception {
489         ApplyState applyState;
490         CaptureSnapshot captureSnapshot;
491
492         MockPayload payload3 = sendPayloadData(leaderActor, "three");
493
494         // Verify the leader applies the state.
495         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
496         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
497
498         captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
499         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
500
501         // Verify the follower 1 applies the state.
502         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
503         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
504
505         // Verify the follower 2 applies the state.
506         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
507         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
508
509         // Verify the leader's state.
510         verifyLeadersTrimmedLog(3);
511
512         // Verify follower 1's state.
513         verifyFollowersTrimmedLog(1, follower1Actor, 3);
514
515         // Verify follower 2's state.
516         verifyFollowersTrimmedLog(2, follower2Actor, 3);
517
518         // Revert back to JVM total memory.
519         leaderActor.underlyingActor().setMockTotalMemory(0);
520
521         MessageCollectorActor.clearMessages(leaderCollectorActor);
522         MessageCollectorActor.clearMessages(follower1CollectorActor);
523         MessageCollectorActor.clearMessages(follower2CollectorActor);
524
525         expSnapshotState.add(payload3);
526     }
527
528     /**
529      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
530      *
531      * @throws Exception
532      */
533     private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
534             @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
535         List<Snapshot> persistedSnapshots;
536         List<ReplicatedLogEntry> unAppliedEntry;
537         ApplySnapshot applySnapshot;
538         InstallSnapshot installSnapshot;
539
540         testLog.info("testInstallSnapshotToLaggingFollower starting");
541
542         MessageCollectorActor.clearMessages(leaderCollectorActor);
543
544         // Now stop dropping AppendEntries in follower 2.
545         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
546
547
548         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
549
550         // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
551         // the snapshot store because the second snapshot was initiated by the follower install snapshot and
552         // not because the batch count was reached so the persisted journal sequence number wasn't advanced
553         // far enough to cause the previous snapshot to be deleted. This is because
554         // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
555         // This is OK - the next snapshot should delete it. In production, even if the system restarted
556         // before another snapshot, they would both get applied which wouldn't hurt anything.
557         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
558         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
559         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
560         verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
561         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
562         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
563
564         int snapshotSize = persistedSnapshot.getState().length;
565         int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
566
567         installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
568         assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
569         assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
570         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
571         assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
572         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
573         assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
574         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
575
576         List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
577                 leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
578         int index = 1;
579         for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
580             assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
581             assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
582             assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
583             assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
584         }
585
586         // Verify follower 2 applies the snapshot.
587         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
588         verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
589         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
590
591         // Wait for the snapshot to complete.
592         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
593
594         // Ensure there's at least 1 more heartbeat.
595         MessageCollectorActor.clearMessages(leaderCollectorActor);
596         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
597
598         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
599         // the log. In addition replicatedToAllIndex should've advanced.
600         verifyLeadersTrimmedLog(lastAppliedIndex);
601
602         if(expServerConfig != null) {
603             Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
604             assertEquals("Leader snapshot server config", expServerInfo,
605                     new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
606
607             assertEquals("Follower 2 snapshot server config", expServerInfo,
608                     new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
609
610             ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
611             assertNotNull("Follower 2 server config is null", follower2ServerConfig);
612
613             assertEquals("Follower 2 server config", expServerInfo,
614                     new HashSet<>(follower2ServerConfig.getServerConfig()));
615         }
616
617         MessageCollectorActor.clearMessages(leaderCollectorActor);
618         MessageCollectorActor.clearMessages(follower1CollectorActor);
619         MessageCollectorActor.clearMessages(follower2CollectorActor);
620
621         testLog.info("testInstallSnapshotToLaggingFollower complete");
622     }
623
624     /**
625      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
626      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
627      *
628      * @throws Exception
629      */
630     private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
631         List<ApplyState> applyStates;
632         ApplyState applyState;
633
634         testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
635                 leader.getReplicatedToAllIndex());
636
637         // Send another payload - a snapshot should occur.
638         MockPayload payload4 = sendPayloadData(leaderActor, "four");
639
640         // Wait for the snapshot to complete.
641         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
642
643         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
644         verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
645
646         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
647         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
648         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
649         verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
650         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
651         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
652         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
653
654         // Send a couple more payloads.
655         MockPayload payload5 = sendPayloadData(leaderActor, "five");
656         MockPayload payload6 = sendPayloadData(leaderActor, "six");
657
658         // Verify the leader applies the 2 log entries.
659         applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
660         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
661         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
662
663         // Verify the leader applies a log entry for at least the last entry index.
664         verifyApplyJournalEntries(leaderCollectorActor, 6);
665
666         // Ensure there's at least 1 more heartbeat to trim the log.
667         MessageCollectorActor.clearMessages(leaderCollectorActor);
668         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
669
670         // Verify the leader's final state.
671         verifyLeadersTrimmedLog(6);
672
673         InMemoryJournal.dumpJournal(leaderId);
674
675         // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
676         // added after the snapshot as the persisted journal should've been purged to the snapshot
677         // sequence number.
678         verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(5, currentTerm, payload5),
679                 new ReplicatedLogImplEntry(6, currentTerm, payload6)));
680
681         // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
682         List<ApplyJournalEntries> persistedApplyJournalEntries = InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
683         boolean found = false;
684         for(ApplyJournalEntries entry: persistedApplyJournalEntries) {
685             if(entry.getToIndex() == 6) {
686                 found = true;
687                 break;
688             }
689         }
690
691         Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6), found);
692
693         // Verify follower 1 applies the 3 log entries.
694         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
695         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
696         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
697         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
698
699         // Verify follower 1's log state.
700         verifyFollowersTrimmedLog(1, follower1Actor, 6);
701
702         // Verify follower 2 applies the 3 log entries.
703         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
704         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
705         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
706         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
707
708         // Verify follower 2's log state.
709         verifyFollowersTrimmedLog(2, follower2Actor, 6);
710
711         expSnapshotState.add(payload4);
712         expSnapshotState.add(payload5);
713         expSnapshotState.add(payload6);
714
715         testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending");
716     }
717
718     /**
719      * Kill the leader actor, reinstate it and verify the recovered journal.
720      */
721     private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) {
722         testLog.info("testLeaderReinstatement starting");
723
724         killActor(leaderActor);
725
726         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
727         TestRaftActor testRaftActor = leaderActor.underlyingActor();
728
729         testRaftActor.startDropMessages(RequestVoteReply.class);
730
731         leaderContext = testRaftActor.getRaftActorContext();
732
733         testRaftActor.waitForRecoveryComplete();
734
735         int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex);
736         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
737         assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex());
738         assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size());
739         assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex());
740         assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
741         assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
742
743         for(long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
744             verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
745                     expSnapshotState.get((int) i));
746         }
747
748         assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
749
750         testLog.info("testLeaderReinstatement ending");
751     }
752
753     private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
754
755         // Send the payloads.
756         for(String d: data) {
757             expSnapshotState.add(sendPayloadData(leaderActor, d));
758         }
759
760         int nEntries = data.length;
761
762         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
763         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, nEntries);
764         for(int i = 0; i < expSnapshotState.size(); i++) {
765             MockPayload payload = expSnapshotState.get(i);
766             verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
767         }
768
769         // Verify follower 1 applies each log entry.
770         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, nEntries);
771         for(int i = 0; i < expSnapshotState.size(); i++) {
772             MockPayload payload = expSnapshotState.get(i);
773             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
774         }
775
776         // Verify follower 2 applies each log entry.
777         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, nEntries);
778         for(int i = 0; i < expSnapshotState.size(); i++) {
779             MockPayload payload = expSnapshotState.get(i);
780             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
781         }
782
783         // Ensure there's at least 1 more heartbeat.
784         MessageCollectorActor.clearMessages(leaderCollectorActor);
785         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
786
787         // The leader should have performed fake snapshots to trim the log to the last index replicated to
788         // all followers.
789         verifyLeadersTrimmedLog(nEntries - 1);
790
791         MessageCollectorActor.clearMessages(leaderCollectorActor);
792         MessageCollectorActor.clearMessages(follower1CollectorActor);
793         MessageCollectorActor.clearMessages(follower2CollectorActor);
794     }
795 }