e48234c0a3b5328244b1242971b6b96a41cc0c15
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.List;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
22 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
23 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
24 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
25
26 /**
27  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication.
28  *
29  * @author Thomas Pantelis
30  */
31 public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
32
33     private List<ReplicatedLogImplEntry> origLeaderJournal;
34
35     private MockPayload recoveredPayload0;
36     private MockPayload recoveredPayload1;
37     private MockPayload recoveredPayload2;
38     private MockPayload payload3;
39     private MockPayload payload4;
40     private MockPayload payload5;
41     private MockPayload payload6;
42     private MockPayload payload7;
43
44     @Test
45     public void runTest() throws Exception {
46         testLog.info("testReplicationAndSnapshots starting");
47
48         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
49         // than the snapshotBatchCount).
50         long seqId = 1;
51         InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
52         recoveredPayload0 = new MockPayload("zero");
53         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
54         recoveredPayload1 = new MockPayload("one");
55         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
56         recoveredPayload2 = new MockPayload("two");
57         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
58         InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
59
60         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
61
62         // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
63         // persistence recovery.
64
65         DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
66         followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
67         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
68                 follower2Id, testActorPath(follower2Id)), followerConfigParams);
69
70         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
71                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
72
73         peerAddresses = ImmutableMap.<String, String>builder().
74                 put(follower1Id, follower1Actor.path().toString()).
75                 put(follower2Id, follower2Actor.path().toString()).build();
76
77         leaderConfigParams = newLeaderConfigParams();
78         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
79
80         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
81         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
82         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
83
84         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
85
86         verifyLeaderRecoveryAndInitialization();
87
88         testFirstSnapshot();
89
90         testSubsequentReplications();
91
92         testSecondSnapshot();
93
94         testLeaderReinstatement();
95
96         testLog.info("testReplicationAndSnapshots ending");
97     }
98
99     /**
100      * Verify the expected leader is elected as the leader and verify initial syncing of the followers
101      * from the leader's persistence recovery.
102      */
103     void verifyLeaderRecoveryAndInitialization() {
104         testLog.info("verifyLeaderRecoveryAndInitialization starting");
105
106         waitUntilLeader(leaderActor);
107
108         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
109         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
110
111         leader = leaderActor.underlyingActor().getCurrentBehavior();
112
113         // The followers should receive AppendEntries for each leader log entry that was recovered from
114         // persistence and apply each one.
115         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
116                 follower1CollectorActor, ApplyState.class, 3);
117         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
118         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
119         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
120
121         // Verify follower 1 applies a log entry for at least the last entry index.
122         verifyApplyJournalEntries(follower1CollectorActor, 2);
123
124         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
125         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
126         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
127         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
128
129         // Verify follower 1]2 applies a log entry for at least the last entry index.
130         verifyApplyJournalEntries(follower2CollectorActor, 2);
131
132         MessageCollectorActor.clearMessages(leaderCollectorActor);
133         MessageCollectorActor.clearMessages(follower1CollectorActor);
134         MessageCollectorActor.clearMessages(follower2CollectorActor);
135
136         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
137         // trimmed the in-memory log so that only the last entry remains.
138         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
139         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
140         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
141         assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
142         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
143         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
144         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
145
146         // Verify the follower's persisted journal log.
147         verifyPersistedJournal(follower1Id, origLeaderJournal);
148         verifyPersistedJournal(follower2Id, origLeaderJournal);
149
150         MessageCollectorActor.clearMessages(leaderCollectorActor);
151         MessageCollectorActor.clearMessages(follower1CollectorActor);
152         MessageCollectorActor.clearMessages(follower2CollectorActor);
153
154         testLog.info("verifyLeaderRecoveryAndInitialization ending");
155     }
156
157     /**
158      * Send a payload to the TestRaftActor to persist and replicate. Since snapshotBatchCount is set to
159      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
160      * scenario, the follower consensus and application of state is delayed until after the snapshot
161      * completes.
162      * @throws Exception
163      */
164     private void testFirstSnapshot() throws Exception {
165         testLog.info("testFirstSnapshot starting");
166
167         expSnapshotState.add(recoveredPayload0);
168         expSnapshotState.add(recoveredPayload1);
169         expSnapshotState.add(recoveredPayload2);
170
171         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
172         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
173         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
174
175         // Send the payload.
176         payload3 = sendPayloadData(leaderActor, "three");
177
178         // Wait for snapshot complete.
179         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
180
181         // The snapshot index should not be advanced nor the log trimmed because replicatedToAllIndex
182         // is behind due the followers not being replicated yet via AppendEntries.
183         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
184         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
185         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
186         assertEquals("Leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
187
188         // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
189         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
190         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
191         assertEquals("Persisted snapshots size", 2, persistedSnapshots.size());
192         verifySnapshot("Persisted", persistedSnapshots.get(1), initialTerm, 2, currentTerm, 3);
193         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(1).getUnAppliedEntries();
194         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
195         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
196
197         // The leader's persisted journal log should be cleared since we snapshotted.
198         List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
199         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
200
201         // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
202         // "fake" snapshot in the leader to advance the snapshot index to 2. Also the state should be applied
203         // in all members (via ApplyState).
204         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
205         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
206
207         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
208         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
209
210         verifyApplyJournalEntries(leaderCollectorActor, 3);
211
212         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
213
214         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
215         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
216
217         verifyApplyJournalEntries(follower1CollectorActor, 3);
218
219         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
220         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
221
222         verifyApplyJournalEntries(follower2CollectorActor, 3);
223
224         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
225         assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex());
226         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
227         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
228         assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
229         assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
230
231         // The followers should also snapshot so verify.
232
233         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
234         persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
235         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
236         // The last applied index in the snapshot may or may not be the last log entry depending on
237         // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index.
238         assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm());
239         assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex());
240
241         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
242
243         MessageCollectorActor.clearMessages(leaderCollectorActor);
244         MessageCollectorActor.clearMessages(follower1CollectorActor);
245         MessageCollectorActor.clearMessages(follower2CollectorActor);
246
247         testLog.info("testFirstSnapshot ending");
248     }
249
250     /**
251      * Send 3 more payload instances and verify they get applied by all members.
252      */
253     private void testSubsequentReplications() {
254         testLog.info("testSubsequentReplications starting");
255
256         payload4 = sendPayloadData(leaderActor, "four");
257         payload5 = sendPayloadData(leaderActor, "five");
258         payload6 = sendPayloadData(leaderActor, "six");
259
260         // Verify the leader applies the states.
261         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
262         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
263         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
264         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
265
266         // Verify the leader applies a log entry for at least the last entry index.
267         verifyApplyJournalEntries(leaderCollectorActor, 6);
268
269         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
270         // trimmed the in-memory log so that only the last entry remains.
271         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
272         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
273         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
274         assertEquals("Leader journal last index", 6, leaderContext.getReplicatedLog().lastIndex());
275         assertEquals("Leader commit index", 6, leaderContext.getCommitIndex());
276         assertEquals("Leader last applied", 6, leaderContext.getLastApplied());
277         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
278
279         // Verify follower 1 applies the states.
280         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
281         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
282         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
283         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
284
285         // Verify follower 1 applies a log entry for at least the last entry index.
286         verifyApplyJournalEntries(follower1CollectorActor, 6);
287
288         // Verify follower 2 applies the states.
289         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
290         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
291         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
292         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
293
294         // Verify follower 2 applies a log entry for at least the last entry index.
295         verifyApplyJournalEntries(follower2CollectorActor, 6);
296
297         MessageCollectorActor.clearMessages(leaderCollectorActor);
298
299         testLog.info("testSubsequentReplications ending");
300     }
301
302     /**
303      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
304      * consensus occurs and the leader applies the state.
305      * @throws Exception
306      */
307     private void testSecondSnapshot() throws Exception {
308         testLog.info("testSecondSnapshot starting");
309
310         expSnapshotState.add(payload3);
311         expSnapshotState.add(payload4);
312         expSnapshotState.add(payload5);
313         expSnapshotState.add(payload6);
314
315         // Delay the CaptureSnapshot message to the leader actor.
316         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
317
318         // Send the payload.
319         payload7 = sendPayloadData(leaderActor, "seven");
320
321         // Capture the CaptureSnapshotReply message so we can send it later.
322         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
323                 CaptureSnapshotReply.class);
324
325         // Wait for the state to be applied in the leader.
326         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
327         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
328
329         // At this point the leader has applied the new state but the cached snapshot index should not be
330         // advanced by a "fake" snapshot because we're in the middle of a snapshot. We'll wait for at least
331         // one more heartbeat AppendEntriesReply to ensure this does not occur.
332         MessageCollectorActor.clearMessages(leaderCollectorActor);
333         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
334
335         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
336         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
337         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
338         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
339         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
340         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
341         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
342
343         // Now deliver the CaptureSnapshotReply.
344         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
345         leaderActor.tell(captureSnapshotReply, leaderActor);
346
347         // Wait for snapshot complete.
348         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
349
350         // Wait for another heartbeat AppendEntriesReply. This should cause a "fake" snapshot to advance the
351         // snapshot index and trimmed the log since we're no longer in a snapshot.
352         MessageCollectorActor.clearMessages(leaderCollectorActor);
353         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
354         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
355         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
356         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
357         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
358         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
359
360         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
361         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
362         // when the snapshot is created (ie when the CaptureSnapshot is processed).
363         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
364         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
365         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
366         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
367         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
368         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
369
370         // The leader's persisted journal log should be cleared since we did a snapshot.
371         List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
372                 leaderId, ReplicatedLogImplEntry.class);
373         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
374
375         // Verify the followers apply all 4 new log entries.
376         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4);
377         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
378         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
379         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
380         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
381
382         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
383         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
384         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
385         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
386         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
387
388         // Verify the follower's snapshot index has also advanced. (after another AppendEntries heartbeat
389         // to be safe).
390
391         MessageCollectorActor.clearMessages(follower1CollectorActor);
392         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
393         RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
394         assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
395         assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
396         assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
397         assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex());
398         assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex());
399
400         MessageCollectorActor.clearMessages(follower2CollectorActor);
401         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
402         RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
403         assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
404         assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
405         assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
406         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
407         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
408
409         expSnapshotState.add(payload7);
410
411         testLog.info("testSecondSnapshot ending");
412     }
413
414     /**
415      * Kill the leader actor, reinstate it and verify the recovered journal.
416      */
417     private void testLeaderReinstatement() {
418         testLog.info("testLeaderReinstatement starting");
419
420         killActor(leaderActor);
421
422         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
423
424         leaderActor.underlyingActor().waitForRecoveryComplete();
425
426         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
427
428         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
429         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
430         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
431         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
432         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
433         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
434         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().last(), currentTerm, 7, payload7);
435
436         testLog.info("testLeaderReinstatement ending");
437     }
438 }