Move ServerConfigurationPayload to cluster.raft.persisted
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import com.google.common.collect.ImmutableMap;
15 import java.util.Arrays;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import javax.annotation.Nullable;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
29 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
38 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40
41 /**
42  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication with a
43  * lagging follower.
44  *
45  * @author Thomas Pantelis
46  */
47 public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
48
49     private void setup() {
50         leaderId = factory.generateActorId("leader");
51         follower1Id = factory.generateActorId("follower");
52         follower2Id = factory.generateActorId("follower");
53
54         // Setup the persistent journal for the leader - just an election term and no journal/snapshots.
55         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
56
57         // Create the leader and 2 follower actors.
58         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
59                 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
60
61         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
62                 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
63
64         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
65                 put(follower1Id, follower1Actor.path().toString()).
66                 put(follower2Id, follower2Actor.path().toString()).build();
67
68         leaderConfigParams = newLeaderConfigParams();
69         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
70
71         waitUntilLeader(leaderActor);
72
73         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
74         leader = leaderActor.underlyingActor().getCurrentBehavior();
75
76         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
77         follower1 = follower1Actor.underlyingActor().getCurrentBehavior();
78
79         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
80         follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
81
82         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
83         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
84
85         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
86         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
87         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
88
89         testLog.info("Leader created and elected");
90     }
91
92     /**
93      * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
94      * caught up via AppendEntries.
95      */
96     @Test
97     public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
98         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads");
99
100         setup();
101
102         // Simulate lagging by dropping AppendEntries messages in follower 2.
103         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
104
105         // Send the payloads.
106         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
107         MockPayload payload1 = sendPayloadData(leaderActor, "one");
108
109         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
110         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
111         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0);
112         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
113
114         // Verify follower 1 applies each log entry.
115         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
116         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
117         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
118
119         // Ensure there's at least 1 more heartbeat.
120         MessageCollectorActor.clearMessages(leaderCollectorActor);
121         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
122
123         // The leader should not have performed fake snapshots to trim the log because the entries have not
124         // been replicated to follower 2.
125         assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm());
126         assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
127         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
128         assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex());
129         assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
130         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
131         assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
132
133         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}", follower2Id);
134
135         // Now stop dropping AppendEntries in follower 2.
136         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
137
138         // Verify follower 2 applies each log entry.
139         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2);
140         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
141         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
142
143         // Ensure there's at least 1 more heartbeat.
144         MessageCollectorActor.clearMessages(leaderCollectorActor);
145         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
146
147         // The leader should now have performed fake snapshots to trim the log.
148         verifyLeadersTrimmedLog(1);
149
150         // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot
151         // to catch it up because no snapshotting was done so the follower's next index was present in the log.
152         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
153                 InstallSnapshot.class);
154         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
155
156         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
157     }
158
159     /**
160      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
161      * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
162      * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
163      * sent by the leader.
164      *
165      * @throws Exception
166      */
167     @Test
168     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
169         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting");
170
171         setup();
172
173         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
174
175         // Configure follower 2 to drop messages and lag.
176         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
177
178         // Send the first payload and verify it gets applied by the leader and follower 1.
179         MockPayload payload2 = sendPayloadData(leaderActor, "two");
180
181         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
182         verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
183
184         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
185         verifyApplyState(applyState, null, null, currentTerm, 2, payload2);
186
187         expSnapshotState.add(payload2);
188
189         MessageCollectorActor.clearMessages(leaderCollectorActor);
190         MessageCollectorActor.clearMessages(follower1CollectorActor);
191
192         // Send another payload - this should cause a snapshot due to snapshotBatchCount reached.
193         MockPayload payload3 = sendPayloadData(leaderActor, "three");
194
195         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
196
197         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads");
198
199         // Send 2 more payloads - not enough to trigger another snapshot.
200         MockPayload payload4 = sendPayloadData(leaderActor, "four");
201         MockPayload payload5 = sendPayloadData(leaderActor, "five");
202
203         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
204         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
205         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
206         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
207         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
208
209         // Verify follower 1 applies each log entry.
210         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
211         verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3);
212         verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4);
213         verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5);
214
215         // The snapshot should have caused the leader to advanced the snapshot index to the
216         // last previously applied index (1) that was replicated to all followers at the time of capture.
217         // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not
218         // have trimmed the log to the last index actually applied (5).
219         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
220         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
221         assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size());
222         assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
223         assertEquals("Leader commit index", 5, leaderContext.getCommitIndex());
224         assertEquals("Leader last applied", 5, leaderContext.getLastApplied());
225         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
226
227         // Now stop dropping AppendEntries in follower 2.
228         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
229
230         // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c
231         // follower 2's next index (3) is still present in the log.
232         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
233         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
234         verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3);
235         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
236         verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
237
238         // Verify the leader did not try to install a snapshot to catch up follower 2.
239         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class);
240         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
241
242         // Ensure there's at least 1 more heartbeat.
243         MessageCollectorActor.clearMessages(leaderCollectorActor);
244         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
245
246         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
247         // the log. In addition replicatedToAllIndex should've advanced.
248         verifyLeadersTrimmedLog(5);
249
250         // Verify the leader's persisted snapshot.
251         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
252         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
253         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3);
254         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
255         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
256         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
257
258         // Verify follower 1's log and snapshot indexes.
259         MessageCollectorActor.clearMessages(follower1CollectorActor);
260         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
261         verifyFollowersTrimmedLog(1, follower1Actor, 5);
262
263         // Verify follower 2's log and snapshot indexes.
264         MessageCollectorActor.clearMessages(follower2CollectorActor);
265         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
266         verifyFollowersTrimmedLog(2, follower2Actor, 5);
267
268         MessageCollectorActor.clearMessages(leaderCollectorActor);
269         MessageCollectorActor.clearMessages(follower1CollectorActor);
270         MessageCollectorActor.clearMessages(follower2CollectorActor);
271
272         expSnapshotState.add(payload3);
273         expSnapshotState.add(payload4);
274         expSnapshotState.add(payload5);
275
276         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete");
277     }
278
279     /**
280      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
281      * lagging where the leader trims its log from the last applied index. Follower 2's log
282      * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
283      * installed by the leader.
284      *
285      * @throws Exception
286      */
287     @Test
288     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
289         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
290
291         setup();
292
293         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
294
295         // Configure follower 2 to drop messages and lag.
296         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
297
298         // Send 5 payloads - the second should cause a leader snapshot.
299         MockPayload payload2 = sendPayloadData(leaderActor, "two");
300         MockPayload payload3 = sendPayloadData(leaderActor, "three");
301         MockPayload payload4 = sendPayloadData(leaderActor, "four");
302         MockPayload payload5 = sendPayloadData(leaderActor, "five");
303         MockPayload payload6 = sendPayloadData(leaderActor, "six");
304
305         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
306
307         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
308         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
309         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
310         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
311         verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
312
313         MessageCollectorActor.clearMessages(leaderCollectorActor);
314
315         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
316
317         // Send another payload to trigger a second leader snapshot.
318         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
319
320         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
321
322         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
323         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
324
325         // Verify follower 1 applies each log entry.
326         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
327         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
328         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
329         verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
330
331         // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
332         // applied index (6) since the log size should have exceed the snapshot batch count (4).
333         // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
334         verifyLeadersTrimmedLog(7, 1);
335
336         expSnapshotState.add(payload2);
337         expSnapshotState.add(payload3);
338         expSnapshotState.add(payload4);
339         expSnapshotState.add(payload5);
340         expSnapshotState.add(payload6);
341
342         MessageCollectorActor.clearMessages(leaderCollectorActor);
343         MessageCollectorActor.clearMessages(follower1CollectorActor);
344
345         // Send a server config change to test that the install snapshot includes the server config.
346
347         ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
348                 new ServerInfo(leaderId, true),
349                 new ServerInfo(follower1Id, false),
350                 new ServerInfo(follower2Id, false)));
351         leaderContext.updatePeerIds(serverConfig);
352         ((AbstractLeader)leader).updateMinReplicaCount();
353         leaderActor.tell(serverConfig, ActorRef.noSender());
354
355         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
356         verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
357
358         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
359         verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
360
361         // Verify the leader's persisted snapshot.
362         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
363         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
364         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
365         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
366         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
367         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
368
369         expSnapshotState.add(payload7);
370
371         verifyInstallSnapshotToLaggingFollower(8, serverConfig);
372
373         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
374     }
375
376     /**
377      * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
378      * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
379      * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
380      * by the leader.
381      *
382      * @throws Exception
383      */
384     @Test
385     public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
386         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
387
388         snapshotBatchCount = 5;
389         setup();
390
391         sendInitialPayloadsReplicatedToAllFollowers("zero");
392
393         leaderActor.underlyingActor().setMockTotalMemory(1000);
394
395         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
396         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
397
398         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
399
400         // Send a payload with a large relative size but not enough to trigger a snapshot.
401         MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
402
403         // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond.
404         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1);
405         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
406
407         // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal
408         // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets
409         // purged from the snapshot store after subsequent snapshots.
410         InMemoryJournal.waitForWriteMessagesComplete(leaderId);
411
412         // Verify a snapshot is not triggered.
413         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
414         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
415
416         expSnapshotState.add(payload1);
417
418         // Send another payload with a large enough relative size in combination with the last payload
419         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
420         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
421
422         // Verify the leader applies the last log entry.
423         applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
424         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
425
426         // Verify follower 1 applies each log entry.
427         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
428         verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1);
429         verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2);
430
431         // A snapshot should've occurred - wait for it to complete.
432         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
433
434         // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced
435         // the snapshot index to the last applied index and trimmed the log even though the entries weren't
436         // replicated to all followers.
437         verifyLeadersTrimmedLog(2, 0);
438
439         // Verify the leader's persisted snapshot.
440         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
441         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
442         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2);
443         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
444         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
445         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2);
446
447         expSnapshotState.add(payload2);
448
449         verifyInstallSnapshotToLaggingFollower(2L, null);
450
451         // Sends a payload with index 3.
452         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
453
454         // Sends 3 payloads with indexes 4, 5 and 6.
455         verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
456
457         // Recover the leader from persistence and verify.
458         long leadersLastIndexOnRecovery = 6;
459
460         // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3.
461         long leadersSnapshotIndexOnRecovery = 3;
462
463         // The recovered journal should have 3 entries starting at index 4.
464         long leadersFirstJournalEntryIndexOnRecovery = 4;
465
466         verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
467                 leadersFirstJournalEntryIndexOnRecovery);
468
469         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending");
470     }
471
472     /**
473      * Send another payload to verify another snapshot is not done since the last snapshot trimmed the
474      * first log entry so the memory threshold should not be exceeded.
475      *
476      * @throws Exception
477      */
478     private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() throws Exception {
479         ApplyState applyState;
480         CaptureSnapshot captureSnapshot;
481
482         MockPayload payload3 = sendPayloadData(leaderActor, "three");
483
484         // Verify the leader applies the state.
485         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
486         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
487
488         captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
489         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
490
491         // Verify the follower 1 applies the state.
492         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
493         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
494
495         // Verify the follower 2 applies the state.
496         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
497         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
498
499         // Verify the leader's state.
500         verifyLeadersTrimmedLog(3);
501
502         // Verify follower 1's state.
503         verifyFollowersTrimmedLog(1, follower1Actor, 3);
504
505         // Verify follower 2's state.
506         verifyFollowersTrimmedLog(2, follower2Actor, 3);
507
508         // Revert back to JVM total memory.
509         leaderActor.underlyingActor().setMockTotalMemory(0);
510
511         MessageCollectorActor.clearMessages(leaderCollectorActor);
512         MessageCollectorActor.clearMessages(follower1CollectorActor);
513         MessageCollectorActor.clearMessages(follower2CollectorActor);
514
515         expSnapshotState.add(payload3);
516     }
517
518     /**
519      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
520      *
521      * @throws Exception
522      */
523     private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
524             @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
525         List<Snapshot> persistedSnapshots;
526         List<ReplicatedLogEntry> unAppliedEntry;
527         ApplySnapshot applySnapshot;
528         InstallSnapshot installSnapshot;
529
530         testLog.info("testInstallSnapshotToLaggingFollower starting");
531
532         MessageCollectorActor.clearMessages(leaderCollectorActor);
533
534         // Now stop dropping AppendEntries in follower 2.
535         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
536
537
538         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
539
540         // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
541         // the snapshot store because the second snapshot was initiated by the follower install snapshot and
542         // not because the batch count was reached so the persisted journal sequence number wasn't advanced
543         // far enough to cause the previous snapshot to be deleted. This is because
544         // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
545         // This is OK - the next snapshot should delete it. In production, even if the system restarted
546         // before another snapshot, they would both get applied which wouldn't hurt anything.
547         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
548         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
549         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
550         verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
551         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
552         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
553
554         int snapshotSize = persistedSnapshot.getState().length;
555         int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
556
557         installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
558         assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
559         assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
560         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
561         assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
562         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
563         assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
564         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
565
566         List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
567                 leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
568         int index = 1;
569         for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
570             assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
571             assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
572             assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
573             assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
574         }
575
576         // Verify follower 2 applies the snapshot.
577         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
578         verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
579         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
580
581         // Wait for the snapshot to complete.
582         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
583
584         // Ensure there's at least 1 more heartbeat.
585         MessageCollectorActor.clearMessages(leaderCollectorActor);
586         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
587
588         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
589         // the log. In addition replicatedToAllIndex should've advanced.
590         verifyLeadersTrimmedLog(lastAppliedIndex);
591
592         if(expServerConfig != null) {
593             Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
594             assertEquals("Leader snapshot server config", expServerInfo,
595                     new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
596
597             assertEquals("Follower 2 snapshot server config", expServerInfo,
598                     new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
599
600             ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
601             assertNotNull("Follower 2 server config is null", follower2ServerConfig);
602
603             assertEquals("Follower 2 server config", expServerInfo,
604                     new HashSet<>(follower2ServerConfig.getServerConfig()));
605         }
606
607         MessageCollectorActor.clearMessages(leaderCollectorActor);
608         MessageCollectorActor.clearMessages(follower1CollectorActor);
609         MessageCollectorActor.clearMessages(follower2CollectorActor);
610
611         testLog.info("testInstallSnapshotToLaggingFollower complete");
612     }
613
614     /**
615      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
616      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
617      *
618      * @throws Exception
619      */
620     private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
621         List<ApplyState> applyStates;
622         ApplyState applyState;
623
624         testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
625                 leader.getReplicatedToAllIndex());
626
627         // Send another payload - a snapshot should occur.
628         MockPayload payload4 = sendPayloadData(leaderActor, "four");
629
630         // Wait for the snapshot to complete.
631         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
632
633         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
634         verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
635
636         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
637         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
638         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
639         verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
640         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
641         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
642         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
643
644         // Send a couple more payloads.
645         MockPayload payload5 = sendPayloadData(leaderActor, "five");
646         MockPayload payload6 = sendPayloadData(leaderActor, "six");
647
648         // Verify the leader applies the 2 log entries.
649         applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
650         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
651         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
652
653         // Verify the leader applies a log entry for at least the last entry index.
654         verifyApplyJournalEntries(leaderCollectorActor, 6);
655
656         // Ensure there's at least 1 more heartbeat to trim the log.
657         MessageCollectorActor.clearMessages(leaderCollectorActor);
658         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
659
660         // Verify the leader's final state.
661         verifyLeadersTrimmedLog(6);
662
663         InMemoryJournal.dumpJournal(leaderId);
664
665         // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
666         // added after the snapshot as the persisted journal should've been purged to the snapshot
667         // sequence number.
668         verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(5, currentTerm, payload5),
669                 new ReplicatedLogImplEntry(6, currentTerm, payload6)));
670
671         // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
672         List<ApplyJournalEntries> persistedApplyJournalEntries = InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
673         boolean found = false;
674         for(ApplyJournalEntries entry: persistedApplyJournalEntries) {
675             if(entry.getToIndex() == 6) {
676                 found = true;
677                 break;
678             }
679         }
680
681         Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6), found);
682
683         // Verify follower 1 applies the 3 log entries.
684         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
685         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
686         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
687         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
688
689         // Verify follower 1's log state.
690         verifyFollowersTrimmedLog(1, follower1Actor, 6);
691
692         // Verify follower 2 applies the 3 log entries.
693         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
694         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
695         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
696         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
697
698         // Verify follower 2's log state.
699         verifyFollowersTrimmedLog(2, follower2Actor, 6);
700
701         expSnapshotState.add(payload4);
702         expSnapshotState.add(payload5);
703         expSnapshotState.add(payload6);
704
705         testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending");
706     }
707
708     /**
709      * Kill the leader actor, reinstate it and verify the recovered journal.
710      */
711     private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) {
712         testLog.info("testLeaderReinstatement starting");
713
714         killActor(leaderActor);
715
716         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
717         TestRaftActor testRaftActor = leaderActor.underlyingActor();
718
719         testRaftActor.startDropMessages(RequestVoteReply.class);
720
721         leaderContext = testRaftActor.getRaftActorContext();
722
723         testRaftActor.waitForRecoveryComplete();
724
725         int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex);
726         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
727         assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex());
728         assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size());
729         assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex());
730         assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
731         assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
732
733         for(long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
734             verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
735                     expSnapshotState.get((int) i));
736         }
737
738         assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
739
740         testLog.info("testLeaderReinstatement ending");
741     }
742
743     private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
744
745         // Send the payloads.
746         for(String d: data) {
747             expSnapshotState.add(sendPayloadData(leaderActor, d));
748         }
749
750         int nEntries = data.length;
751
752         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
753         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, nEntries);
754         for(int i = 0; i < expSnapshotState.size(); i++) {
755             MockPayload payload = expSnapshotState.get(i);
756             verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
757         }
758
759         // Verify follower 1 applies each log entry.
760         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, nEntries);
761         for(int i = 0; i < expSnapshotState.size(); i++) {
762             MockPayload payload = expSnapshotState.get(i);
763             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
764         }
765
766         // Verify follower 2 applies each log entry.
767         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, nEntries);
768         for(int i = 0; i < expSnapshotState.size(); i++) {
769             MockPayload payload = expSnapshotState.get(i);
770             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
771         }
772
773         // Ensure there's at least 1 more heartbeat.
774         MessageCollectorActor.clearMessages(leaderCollectorActor);
775         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
776
777         // The leader should have performed fake snapshots to trim the log to the last index replicated to
778         // all followers.
779         verifyLeadersTrimmedLog(nEntries - 1);
780
781         MessageCollectorActor.clearMessages(leaderCollectorActor);
782         MessageCollectorActor.clearMessages(follower1CollectorActor);
783         MessageCollectorActor.clearMessages(follower2CollectorActor);
784     }
785 }