Change ReplicatedLogImplEntry to Externalizable proxy pattern
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12
13 import akka.actor.ActorRef;
14 import akka.persistence.SaveSnapshotSuccess;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.Arrays;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nullable;
24 import org.junit.Assert;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
30 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
35 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
36 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
41 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
42 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
43 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
44
45 /**
46  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication with a
47  * lagging follower.
48  *
49  * @author Thomas Pantelis
50  */
51 public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
52
53     private void setup() {
54         leaderId = factory.generateActorId("leader");
55         follower1Id = factory.generateActorId("follower");
56         follower2Id = factory.generateActorId("follower");
57
58         // Setup the persistent journal for the leader - just an election term and no journal/snapshots.
59         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
60
61         // Create the leader and 2 follower actors.
62         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
63                 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
64
65         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
66                 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
67
68         Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
69                 .put(follower1Id, follower1Actor.path().toString())
70                 .put(follower2Id, follower2Actor.path().toString()).build();
71
72         leaderConfigParams = newLeaderConfigParams();
73         leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
74
75         waitUntilLeader(leaderActor);
76
77         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
78         leader = leaderActor.underlyingActor().getCurrentBehavior();
79
80         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
81         follower1 = follower1Actor.underlyingActor().getCurrentBehavior();
82
83         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
84         follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
85
86         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
87         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
88
89         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
90         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
91         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
92
93         testLog.info("Leader created and elected");
94     }
95
96     /**
97      * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
98      * caught up via AppendEntries.
99      */
100     @Test
101     public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
102         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads");
103
104         setup();
105
106         // Simulate lagging by dropping AppendEntries messages in follower 2.
107         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
108
109         // Send the payloads.
110         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
111         MockPayload payload1 = sendPayloadData(leaderActor, "one");
112
113         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
114         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
115         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0);
116         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
117
118         // Verify follower 1 applies each log entry.
119         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
120         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
121         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
122
123         // Ensure there's at least 1 more heartbeat.
124         MessageCollectorActor.clearMessages(leaderCollectorActor);
125         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
126
127         // The leader should not have performed fake snapshots to trim the log because the entries have not
128         // been replicated to follower 2.
129         assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm());
130         assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
131         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
132         assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex());
133         assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
134         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
135         assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
136
137         testLog.info(
138             "testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}",
139             follower2Id);
140
141         // Now stop dropping AppendEntries in follower 2.
142         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
143
144         // Verify follower 2 applies each log entry.
145         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2);
146         verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0);
147         verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1);
148
149         // Ensure there's at least 1 more heartbeat.
150         MessageCollectorActor.clearMessages(leaderCollectorActor);
151         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
152
153         // The leader should now have performed fake snapshots to trim the log.
154         verifyLeadersTrimmedLog(1);
155
156         // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot
157         // to catch it up because no snapshotting was done so the follower's next index was present in the log.
158         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
159                 InstallSnapshot.class);
160         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
161
162         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
163     }
164
165     /**
166      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
167      * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
168      * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
169      * sent by the leader.
170      */
171     @Test
172     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
173         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting");
174
175         setup();
176
177         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
178
179         // Configure follower 2 to drop messages and lag.
180         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
181
182         // Send the first payload and verify it gets applied by the leader and follower 1.
183         MockPayload payload2 = sendPayloadData(leaderActor, "two");
184
185         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
186         verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
187
188         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
189         verifyApplyState(applyState, null, null, currentTerm, 2, payload2);
190
191         expSnapshotState.add(payload2);
192
193         MessageCollectorActor.clearMessages(leaderCollectorActor);
194         MessageCollectorActor.clearMessages(follower1CollectorActor);
195
196         // Send another payload - this should cause a snapshot due to snapshotBatchCount reached.
197         MockPayload payload3 = sendPayloadData(leaderActor, "three");
198
199         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
200
201         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads");
202
203         // Send 2 more payloads - not enough to trigger another snapshot.
204         MockPayload payload4 = sendPayloadData(leaderActor, "four");
205         MockPayload payload5 = sendPayloadData(leaderActor, "five");
206
207         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
208         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
209         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
210         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
211         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
212
213         // Verify follower 1 applies each log entry.
214         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
215         verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3);
216         verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4);
217         verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5);
218
219         // The snapshot should have caused the leader to advanced the snapshot index to the
220         // last previously applied index (1) that was replicated to all followers at the time of capture.
221         // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not
222         // have trimmed the log to the last index actually applied (5).
223         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
224         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
225         assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size());
226         assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
227         assertEquals("Leader commit index", 5, leaderContext.getCommitIndex());
228         assertEquals("Leader last applied", 5, leaderContext.getLastApplied());
229         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
230
231         // Now stop dropping AppendEntries in follower 2.
232         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
233
234         // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c
235         // follower 2's next index (3) is still present in the log.
236         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
237         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
238         verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3);
239         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
240         verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
241
242         // Verify the leader did not try to install a snapshot to catch up follower 2.
243         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
244                 InstallSnapshot.class);
245         Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
246
247         // Ensure there's at least 1 more heartbeat.
248         MessageCollectorActor.clearMessages(leaderCollectorActor);
249         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
250
251         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
252         // the log. In addition replicatedToAllIndex should've advanced.
253         verifyLeadersTrimmedLog(5);
254
255         // Verify the leader's persisted snapshot.
256         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
257         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
258         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3);
259         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
260         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
261         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
262
263         // Verify follower 1's log and snapshot indexes.
264         MessageCollectorActor.clearMessages(follower1CollectorActor);
265         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
266         verifyFollowersTrimmedLog(1, follower1Actor, 5);
267
268         // Verify follower 2's log and snapshot indexes.
269         MessageCollectorActor.clearMessages(follower2CollectorActor);
270         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
271         verifyFollowersTrimmedLog(2, follower2Actor, 5);
272
273         MessageCollectorActor.clearMessages(leaderCollectorActor);
274         MessageCollectorActor.clearMessages(follower1CollectorActor);
275         MessageCollectorActor.clearMessages(follower2CollectorActor);
276
277         expSnapshotState.add(payload3);
278         expSnapshotState.add(payload4);
279         expSnapshotState.add(payload5);
280
281         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete");
282     }
283
284     /**
285      * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2
286      * lagging where the leader trims its log from the last applied index. Follower 2's log
287      * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
288      * installed by the leader.
289      */
290     @Test
291     public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
292         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting");
293
294         setup();
295
296         sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
297
298         // Configure follower 2 to drop messages and lag.
299         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
300
301         // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
302         Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
303                 TimeUnit.MILLISECONDS);
304
305         // Send 5 payloads - the second should cause a leader snapshot.
306         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
307         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
308         final MockPayload payload4 = sendPayloadData(leaderActor, "four");
309         final MockPayload payload5 = sendPayloadData(leaderActor, "five");
310         final MockPayload payload6 = sendPayloadData(leaderActor, "six");
311
312         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
313
314         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
315         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
316         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
317         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
318         verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
319
320         MessageCollectorActor.clearMessages(leaderCollectorActor);
321
322         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
323                 + "sending 1 more payload to trigger second snapshot");
324
325         // Send another payload to trigger a second leader snapshot.
326         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
327
328         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
329
330         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
331         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
332
333         // Verify follower 1 applies each log entry.
334         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
335         verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
336         verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
337         verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
338
339         // The snapshot should have caused the leader to advanced the snapshot index to the leader's last
340         // applied index (6) since the log size should have exceed the snapshot batch count (4).
341         // replicatedToAllIndex should remain at 1 since follower 2 is lagging.
342         verifyLeadersTrimmedLog(7, 1);
343
344         expSnapshotState.add(payload2);
345         expSnapshotState.add(payload3);
346         expSnapshotState.add(payload4);
347         expSnapshotState.add(payload5);
348         expSnapshotState.add(payload6);
349
350         MessageCollectorActor.clearMessages(leaderCollectorActor);
351         MessageCollectorActor.clearMessages(follower1CollectorActor);
352
353         // Send a server config change to test that the install snapshot includes the server config.
354
355         ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
356                 new ServerInfo(leaderId, true),
357                 new ServerInfo(follower1Id, false),
358                 new ServerInfo(follower2Id, false)));
359         leaderContext.updatePeerIds(serverConfig);
360         ((AbstractLeader)leader).updateMinReplicaCount();
361         leaderActor.tell(serverConfig, ActorRef.noSender());
362
363         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
364         verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
365
366         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
367         verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
368
369         // Verify the leader's persisted snapshot.
370         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
371         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
372         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
373         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
374         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
375         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
376
377         expSnapshotState.add(payload7);
378
379         verifyInstallSnapshotToLaggingFollower(8, serverConfig);
380
381         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
382     }
383
384     /**
385      * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
386      * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
387      * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
388      * by the leader.
389      */
390     @Test
391     public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
392         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting");
393
394         snapshotBatchCount = 5;
395         setup();
396
397         sendInitialPayloadsReplicatedToAllFollowers("zero");
398
399         leaderActor.underlyingActor().setMockTotalMemory(1000);
400
401         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
402         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
403
404         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
405
406         // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
407         Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
408                 TimeUnit.MILLISECONDS);
409
410         // Send a payload with a large relative size but not enough to trigger a snapshot.
411         MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
412
413         // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond.
414         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1);
415         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1);
416
417         // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal
418         // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets
419         // purged from the snapshot store after subsequent snapshots.
420         InMemoryJournal.waitForWriteMessagesComplete(leaderId);
421
422         // Verify a snapshot is not triggered.
423         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
424                 CaptureSnapshot.class);
425         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
426
427         expSnapshotState.add(payload1);
428
429         // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
430         Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
431                 TimeUnit.MILLISECONDS);
432
433         // Send another payload with a large enough relative size in combination with the last payload
434         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
435         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
436
437         // Verify the leader applies the last log entry.
438         applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
439         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
440
441         // Verify follower 1 applies each log entry.
442         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
443         verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1);
444         verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2);
445
446         // A snapshot should've occurred - wait for it to complete.
447         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
448
449         // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced
450         // the snapshot index to the last applied index and trimmed the log even though the entries weren't
451         // replicated to all followers.
452         verifyLeadersTrimmedLog(2, 0);
453
454         // Verify the leader's persisted snapshot.
455         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
456         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
457         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2);
458         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
459         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
460         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2);
461
462         expSnapshotState.add(payload2);
463
464         verifyInstallSnapshotToLaggingFollower(2L, null);
465
466         // Sends a payload with index 3.
467         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
468
469         // Sends 3 payloads with indexes 4, 5 and 6.
470         long leadersSnapshotIndexOnRecovery = verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
471
472         // Recover the leader from persistence and verify.
473         long leadersLastIndexOnRecovery = 6;
474
475         long leadersFirstJournalEntryIndexOnRecovery = leadersSnapshotIndexOnRecovery + 1;
476
477         verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
478                 leadersFirstJournalEntryIndexOnRecovery);
479
480         testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending");
481     }
482
483     /**
484      * Send another payload to verify another snapshot is not done since the last snapshot trimmed the
485      * first log entry so the memory threshold should not be exceeded.
486      */
487     private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() throws Exception {
488         ApplyState applyState;
489         CaptureSnapshot captureSnapshot;
490
491         MockPayload payload3 = sendPayloadData(leaderActor, "three");
492
493         // Verify the leader applies the state.
494         applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
495         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
496
497         captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
498         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
499
500         // Verify the follower 1 applies the state.
501         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
502         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
503
504         // Verify the follower 2 applies the state.
505         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
506         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
507
508         // Verify the leader's state.
509         verifyLeadersTrimmedLog(3);
510
511         // Verify follower 1's state.
512         verifyFollowersTrimmedLog(1, follower1Actor, 3);
513
514         // Verify follower 2's state.
515         verifyFollowersTrimmedLog(2, follower2Actor, 3);
516
517         // Revert back to JVM total memory.
518         leaderActor.underlyingActor().setMockTotalMemory(0);
519
520         MessageCollectorActor.clearMessages(leaderCollectorActor);
521         MessageCollectorActor.clearMessages(follower1CollectorActor);
522         MessageCollectorActor.clearMessages(follower2CollectorActor);
523
524         expSnapshotState.add(payload3);
525     }
526
527     /**
528      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
529      */
530     private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
531             @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
532         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
533
534         MessageCollectorActor.clearMessages(leaderCollectorActor);
535
536         // Now stop dropping AppendEntries in follower 2.
537         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
538
539
540         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
541
542         // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
543         // the snapshot store because the second snapshot was initiated by the follower install snapshot and
544         // not because the batch count was reached so the persisted journal sequence number wasn't advanced
545         // far enough to cause the previous snapshot to be deleted. This is because
546         // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
547         // This is OK - the next snapshot should delete it. In production, even if the system restarted
548         // before another snapshot, they would both get applied which wouldn't hurt anything.
549         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
550         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
551         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
552         verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
553         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
554         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
555
556         int snapshotSize = persistedSnapshot.getState().length;
557         final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
558                 + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
559
560         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
561                 InstallSnapshot.class);
562         assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
563         assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
564         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
565         assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
566         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
567         assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
568         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
569
570         List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
571                 leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
572         int index = 1;
573         for (InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
574             assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
575             assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
576             assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
577             assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
578         }
579
580         // Verify follower 2 applies the snapshot.
581         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
582                 ApplySnapshot.class);
583         verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm,
584                 lastAppliedIndex);
585         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0,
586                 applySnapshot.getSnapshot().getUnAppliedEntries().size());
587
588         // Wait for the snapshot to complete.
589         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
590
591         // Ensure there's at least 1 more heartbeat.
592         MessageCollectorActor.clearMessages(leaderCollectorActor);
593         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
594
595         // The leader should now have performed fake snapshots to advance the snapshot index and to trim
596         // the log. In addition replicatedToAllIndex should've advanced.
597         verifyLeadersTrimmedLog(lastAppliedIndex);
598
599         if (expServerConfig != null) {
600             Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
601             assertEquals("Leader snapshot server config", expServerInfo,
602                     new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
603
604             assertEquals("Follower 2 snapshot server config", expServerInfo,
605                     new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
606
607             ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
608             assertNotNull("Follower 2 server config is null", follower2ServerConfig);
609
610             assertEquals("Follower 2 server config", expServerInfo,
611                     new HashSet<>(follower2ServerConfig.getServerConfig()));
612         }
613
614         MessageCollectorActor.clearMessages(leaderCollectorActor);
615         MessageCollectorActor.clearMessages(follower1CollectorActor);
616         MessageCollectorActor.clearMessages(follower2CollectorActor);
617
618         testLog.info("verifyInstallSnapshotToLaggingFollower complete");
619     }
620
621     /**
622      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
623      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
624      */
625     private long verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
626         testLog.info(
627                 "verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot starting: replicatedToAllIndex: {}",
628                 leader.getReplicatedToAllIndex());
629
630         // Send another payload - a snapshot should occur.
631         MockPayload payload4 = sendPayloadData(leaderActor, "four");
632
633         // Wait for the snapshot to complete.
634         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
635
636         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
637         verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
638
639         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
640         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
641         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
642         // The last (fourth) payload may or may not have been applied when the snapshot is captured depending on the
643         // timing when the async persistence completes.
644         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
645         long leadersSnapshotIndex;
646         if (unAppliedEntry.isEmpty()) {
647             leadersSnapshotIndex = 4;
648             expSnapshotState.add(payload4);
649             verifySnapshot("Persisted", persistedSnapshot, currentTerm, 4, currentTerm, 4);
650         } else {
651             leadersSnapshotIndex = 3;
652             verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
653             assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
654             verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
655             expSnapshotState.add(payload4);
656         }
657
658         // Send a couple more payloads.
659         MockPayload payload5 = sendPayloadData(leaderActor, "five");
660         MockPayload payload6 = sendPayloadData(leaderActor, "six");
661
662         // Verify the leader applies the 2 log entries.
663         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
664         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
665         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
666
667         // Verify the leader applies a log entry for at least the last entry index.
668         verifyApplyJournalEntries(leaderCollectorActor, 6);
669
670         // Ensure there's at least 1 more heartbeat to trim the log.
671         MessageCollectorActor.clearMessages(leaderCollectorActor);
672         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
673
674         // Verify the leader's final state.
675         verifyLeadersTrimmedLog(6);
676
677         InMemoryJournal.dumpJournal(leaderId);
678
679         // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
680         // added after the snapshot as the persisted journal should've been purged to the snapshot
681         // sequence number.
682         verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
683                 new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
684
685         // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
686         List<ApplyJournalEntries> persistedApplyJournalEntries =
687                 InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
688         boolean found = false;
689         for (ApplyJournalEntries entry: persistedApplyJournalEntries) {
690             if (entry.getToIndex() == 6) {
691                 found = true;
692                 break;
693             }
694         }
695
696         Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6),
697                 found);
698
699         // Verify follower 1 applies the 3 log entries.
700         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
701         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
702         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
703         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
704
705         // Verify follower 1's log state.
706         verifyFollowersTrimmedLog(1, follower1Actor, 6);
707
708         // Verify follower 2 applies the 3 log entries.
709         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
710         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
711         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
712         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
713
714         // Verify follower 2's log state.
715         verifyFollowersTrimmedLog(2, follower2Actor, 6);
716
717         expSnapshotState.add(payload5);
718         expSnapshotState.add(payload6);
719
720         testLog.info("verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot ending");
721
722         return leadersSnapshotIndex;
723     }
724
725     /**
726      * Kill the leader actor, reinstate it and verify the recovered journal.
727      */
728     private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
729             long firstJournalEntryIndex) {
730         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
731             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
732
733         killActor(leaderActor);
734
735         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
736         TestRaftActor testRaftActor = leaderActor.underlyingActor();
737
738         testRaftActor.startDropMessages(RequestVoteReply.class);
739
740         leaderContext = testRaftActor.getRaftActorContext();
741
742         testRaftActor.waitForRecoveryComplete();
743
744         int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex);
745         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
746         assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex());
747         assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size());
748         assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex());
749         assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
750         assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
751
752         for (long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
753             verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
754                     expSnapshotState.get((int) i));
755         }
756
757         assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
758
759         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
760     }
761
762     private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
763
764         // Send the payloads.
765         for (String d: data) {
766             expSnapshotState.add(sendPayloadData(leaderActor, d));
767         }
768
769         int numEntries = data.length;
770
771         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
772         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
773                 ApplyState.class, numEntries);
774         for (int i = 0; i < expSnapshotState.size(); i++) {
775             MockPayload payload = expSnapshotState.get(i);
776             verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
777         }
778
779         // Verify follower 1 applies each log entry.
780         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, numEntries);
781         for (int i = 0; i < expSnapshotState.size(); i++) {
782             MockPayload payload = expSnapshotState.get(i);
783             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
784         }
785
786         // Verify follower 2 applies each log entry.
787         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, numEntries);
788         for (int i = 0; i < expSnapshotState.size(); i++) {
789             MockPayload payload = expSnapshotState.get(i);
790             verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
791         }
792
793         // Ensure there's at least 1 more heartbeat.
794         MessageCollectorActor.clearMessages(leaderCollectorActor);
795         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
796
797         // The leader should have performed fake snapshots to trim the log to the last index replicated to
798         // all followers.
799         verifyLeadersTrimmedLog(numEntries - 1);
800
801         MessageCollectorActor.clearMessages(leaderCollectorActor);
802         MessageCollectorActor.clearMessages(follower1CollectorActor);
803         MessageCollectorActor.clearMessages(follower2CollectorActor);
804     }
805 }