Eliminate use of deprecated mockito methods
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.persistence.SaveSnapshotSuccess;
13 import com.google.common.collect.ImmutableMap;
14 import java.util.List;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
21 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
22 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
24 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28
29 /**
30  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication.
31  *
32  * @author Thomas Pantelis
33  */
34 public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
35
36     private List<SimpleReplicatedLogEntry> origLeaderJournal;
37
38     private MockPayload recoveredPayload0;
39     private MockPayload recoveredPayload1;
40     private MockPayload recoveredPayload2;
41     private MockPayload payload3;
42     private MockPayload payload4;
43     private MockPayload payload5;
44     private MockPayload payload6;
45     private MockPayload payload7;
46
47     @Test
48     public void runTest() {
49         testLog.info("testReplicationAndSnapshots starting");
50
51         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
52         // than the snapshotBatchCount).
53         long seqId = 1;
54         InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
55         recoveredPayload0 = new MockPayload("zero");
56         InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(0, initialTerm, recoveredPayload0));
57         recoveredPayload1 = new MockPayload("one");
58         InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(1, initialTerm, recoveredPayload1));
59         recoveredPayload2 = new MockPayload("two");
60         InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(2, initialTerm, recoveredPayload2));
61         InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
62
63         origLeaderJournal = InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
64
65         // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
66         // persistence recovery.
67
68         DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
69         followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
70         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
71                 follower2Id, testActorPath(follower2Id)), followerConfigParams);
72
73         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
74                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
75
76         peerAddresses = ImmutableMap.<String, String>builder()
77                 .put(follower1Id, follower1Actor.path().toString())
78                 .put(follower2Id, follower2Actor.path().toString()).build();
79
80         leaderConfigParams = newLeaderConfigParams();
81         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
82
83         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
84         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
85         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
86
87         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
88
89         verifyLeaderRecoveryAndInitialization();
90
91         testFirstSnapshot();
92
93         testSubsequentReplications();
94
95         testSecondSnapshot();
96
97         testLeaderReinstatement();
98
99         testLog.info("testReplicationAndSnapshots ending");
100     }
101
102     /**
103      * Verify the expected leader is elected as the leader and verify initial syncing of the followers
104      * from the leader's persistence recovery.
105      */
106     void verifyLeaderRecoveryAndInitialization() {
107         testLog.info("verifyLeaderRecoveryAndInitialization starting");
108
109         waitUntilLeader(leaderActor);
110
111         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
112         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
113
114         leader = leaderActor.underlyingActor().getCurrentBehavior();
115
116         // The followers should receive AppendEntries for each leader log entry that was recovered from
117         // persistence and apply each one.
118         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
119                 follower1CollectorActor, ApplyState.class, 3);
120         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
121         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
122         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
123
124         // Verify follower 1 applies a log entry for at least the last entry index.
125         verifyApplyJournalEntries(follower1CollectorActor, 2);
126
127         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
128         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
129         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
130         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
131
132         // Verify follower 1]2 applies a log entry for at least the last entry index.
133         verifyApplyJournalEntries(follower2CollectorActor, 2);
134
135         MessageCollectorActor.clearMessages(leaderCollectorActor);
136         MessageCollectorActor.clearMessages(follower1CollectorActor);
137         MessageCollectorActor.clearMessages(follower2CollectorActor);
138
139         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
140         // trimmed the in-memory log so that only the last entry remains.
141         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
142         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
143         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
144         assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
145         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
146         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
147         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
148
149         // Verify the follower's persisted journal log.
150         verifyPersistedJournal(follower1Id, origLeaderJournal);
151         verifyPersistedJournal(follower2Id, origLeaderJournal);
152
153         MessageCollectorActor.clearMessages(leaderCollectorActor);
154         MessageCollectorActor.clearMessages(follower1CollectorActor);
155         MessageCollectorActor.clearMessages(follower2CollectorActor);
156
157         testLog.info("verifyLeaderRecoveryAndInitialization ending");
158     }
159
160     /**
161      * Send a payload to the TestRaftActor to persist and replicate. Since snapshotBatchCount is set to
162      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
163      * scenario, the follower consensus and application of state is delayed until after the snapshot
164      * completes.
165      */
166     private void testFirstSnapshot() {
167         testLog.info("testFirstSnapshot starting");
168
169         expSnapshotState.add(recoveredPayload0);
170         expSnapshotState.add(recoveredPayload1);
171         expSnapshotState.add(recoveredPayload2);
172
173         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
174         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
175         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
176
177         // Send the payload.
178         payload3 = sendPayloadData(leaderActor, "three");
179
180         // Wait for snapshot complete.
181         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
182
183         // The snapshot index should not be advanced nor the log trimmed because replicatedToAllIndex
184         // is behind due the followers not being replicated yet via AppendEntries.
185         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
186         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
187         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
188         assertEquals("Leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
189
190         // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
191         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
192         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
193         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
194         verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
195         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
196         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
197         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
198
199         // The leader's persisted journal log should be cleared since we snapshotted.
200         List<SimpleReplicatedLogEntry> persistedLeaderJournal =
201                 InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
202         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
203
204         // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
205         // "fake" snapshot in the leader to advance the snapshot index to 2. Also the state should be applied
206         // in all members (via ApplyState).
207         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
208         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
209
210         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
211         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
212
213         verifyApplyJournalEntries(leaderCollectorActor, 3);
214
215         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
216
217         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
218         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
219
220         verifyApplyJournalEntries(follower1CollectorActor, 3);
221
222         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
223         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
224
225         verifyApplyJournalEntries(follower2CollectorActor, 3);
226
227         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
228         assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex());
229         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
230         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
231         assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
232         assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
233
234         // The followers should also snapshot so verify.
235
236         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
237         persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
238         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
239         // The last applied index in the snapshot may or may not be the last log entry depending on
240         // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index.
241         assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm());
242         assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex());
243
244         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
245
246         MessageCollectorActor.clearMessages(leaderCollectorActor);
247         MessageCollectorActor.clearMessages(follower1CollectorActor);
248         MessageCollectorActor.clearMessages(follower2CollectorActor);
249
250         testLog.info("testFirstSnapshot ending");
251     }
252
253     /**
254      * Send 3 more payload instances and verify they get applied by all members.
255      */
256     private void testSubsequentReplications() {
257         testLog.info("testSubsequentReplications starting");
258
259         payload4 = sendPayloadData(leaderActor, "four");
260         payload5 = sendPayloadData(leaderActor, "five");
261         payload6 = sendPayloadData(leaderActor, "six");
262
263         // Verify the leader applies the states.
264         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
265         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
266         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
267         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
268
269         // Verify the leader applies a log entry for at least the last entry index.
270         verifyApplyJournalEntries(leaderCollectorActor, 6);
271
272         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
273         // trimmed the in-memory log so that only the last entry remains.
274         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
275         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
276         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
277         assertEquals("Leader journal last index", 6, leaderContext.getReplicatedLog().lastIndex());
278         assertEquals("Leader commit index", 6, leaderContext.getCommitIndex());
279         assertEquals("Leader last applied", 6, leaderContext.getLastApplied());
280         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
281
282         // Verify follower 1 applies the states.
283         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
284         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
285         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
286         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
287
288         // Verify follower 1 applies a log entry for at least the last entry index.
289         verifyApplyJournalEntries(follower1CollectorActor, 6);
290
291         // Verify follower 2 applies the states.
292         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
293         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
294         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
295         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
296
297         // Verify follower 2 applies a log entry for at least the last entry index.
298         verifyApplyJournalEntries(follower2CollectorActor, 6);
299
300         MessageCollectorActor.clearMessages(leaderCollectorActor);
301
302         testLog.info("testSubsequentReplications ending");
303     }
304
305     /**
306      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
307      * consensus occurs and the leader applies the state.
308      */
309     private void testSecondSnapshot() {
310         testLog.info("testSecondSnapshot starting");
311
312         expSnapshotState.add(payload3);
313         expSnapshotState.add(payload4);
314         expSnapshotState.add(payload5);
315         expSnapshotState.add(payload6);
316
317         // Delay the CaptureSnapshot message to the leader actor.
318         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
319
320         // Send the payload.
321         payload7 = sendPayloadData(leaderActor, "seven");
322
323         // Capture the CaptureSnapshotReply message so we can send it later.
324         final CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
325                 leaderCollectorActor, CaptureSnapshotReply.class);
326
327         // Wait for the state to be applied in the leader.
328         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
329         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
330
331         // At this point the leader has applied the new state but the cached snapshot index should not be
332         // advanced by a "fake" snapshot because we're in the middle of a snapshot. We'll wait for at least
333         // one more heartbeat AppendEntriesReply to ensure this does not occur.
334         MessageCollectorActor.clearMessages(leaderCollectorActor);
335         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
336
337         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
338         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
339         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
340         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
341         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
342         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
343         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
344
345         // Now deliver the CaptureSnapshotReply.
346         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
347         leaderActor.tell(captureSnapshotReply, leaderActor);
348
349         // Wait for snapshot complete.
350         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
351
352         // Wait for another heartbeat AppendEntriesReply. This should cause a "fake" snapshot to advance the
353         // snapshot index and trimmed the log since we're no longer in a snapshot.
354         MessageCollectorActor.clearMessages(leaderCollectorActor);
355         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
356         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
357         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
358         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
359         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
360         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
361
362         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
363         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
364         // when the snapshot is created (ie when the CaptureSnapshot is processed).
365         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
366         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
367         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
368         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
369         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
370         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
371
372         // The leader's persisted journal log should be cleared since we did a snapshot.
373         List<SimpleReplicatedLogEntry> persistedLeaderJournal = InMemoryJournal.get(
374                 leaderId, SimpleReplicatedLogEntry.class);
375         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
376
377         // Verify the followers apply all 4 new log entries.
378         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor,
379                 ApplyState.class, 4);
380         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
381         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
382         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
383         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
384
385         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
386         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
387         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
388         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
389         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
390
391         // Verify the follower's snapshot index has also advanced. (after another AppendEntries heartbeat
392         // to be safe).
393
394         MessageCollectorActor.clearMessages(follower1CollectorActor);
395         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
396         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
397         assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
398         assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
399         assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
400         assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex());
401         assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex());
402
403         MessageCollectorActor.clearMessages(follower2CollectorActor);
404         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
405         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
406         assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
407         assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
408         assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
409         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
410         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
411
412         expSnapshotState.add(payload7);
413
414         testLog.info("testSecondSnapshot ending");
415     }
416
417     /**
418      * Kill the leader actor, reinstate it and verify the recovered journal.
419      */
420     private void testLeaderReinstatement() {
421         testLog.info("testLeaderReinstatement starting");
422
423         killActor(leaderActor);
424
425         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
426
427         leaderActor.underlyingActor().waitForRecoveryComplete();
428
429         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
430
431         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
432         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
433         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
434         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
435         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
436         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
437         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().last(), currentTerm, 7, payload7);
438
439         testLog.info("testLeaderReinstatement ending");
440     }
441 }