386108f61529258a13fb91c5f83eac8c41c4d8f3
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.persistence.SaveSnapshotSuccess;
13 import com.google.common.collect.ImmutableMap;
14 import java.util.List;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
21 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
22 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
23 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
25 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
26
27 /**
28  * Tests replication and snapshots end-to-end using real RaftActors and behavior communication.
29  *
30  * @author Thomas Pantelis
31  */
32 public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
33
34     private List<ReplicatedLogImplEntry> origLeaderJournal;
35
36     private MockPayload recoveredPayload0;
37     private MockPayload recoveredPayload1;
38     private MockPayload recoveredPayload2;
39     private MockPayload payload3;
40     private MockPayload payload4;
41     private MockPayload payload5;
42     private MockPayload payload6;
43     private MockPayload payload7;
44
45     @Test
46     public void runTest() throws Exception {
47         testLog.info("testReplicationAndSnapshots starting");
48
49         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
50         // than the snapshotBatchCount).
51         long seqId = 1;
52         InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
53         recoveredPayload0 = new MockPayload("zero");
54         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
55         recoveredPayload1 = new MockPayload("one");
56         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
57         recoveredPayload2 = new MockPayload("two");
58         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
59         InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
60
61         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
62
63         // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
64         // persistence recovery.
65
66         DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
67         followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
68         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
69                 follower2Id, testActorPath(follower2Id)), followerConfigParams);
70
71         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
72                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
73
74         peerAddresses = ImmutableMap.<String, String>builder()
75                 .put(follower1Id, follower1Actor.path().toString())
76                 .put(follower2Id, follower2Actor.path().toString()).build();
77
78         leaderConfigParams = newLeaderConfigParams();
79         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
80
81         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
82         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
83         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
84
85         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
86
87         verifyLeaderRecoveryAndInitialization();
88
89         testFirstSnapshot();
90
91         testSubsequentReplications();
92
93         testSecondSnapshot();
94
95         testLeaderReinstatement();
96
97         testLog.info("testReplicationAndSnapshots ending");
98     }
99
100     /**
101      * Verify the expected leader is elected as the leader and verify initial syncing of the followers
102      * from the leader's persistence recovery.
103      */
104     void verifyLeaderRecoveryAndInitialization() {
105         testLog.info("verifyLeaderRecoveryAndInitialization starting");
106
107         waitUntilLeader(leaderActor);
108
109         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
110         assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
111
112         leader = leaderActor.underlyingActor().getCurrentBehavior();
113
114         // The followers should receive AppendEntries for each leader log entry that was recovered from
115         // persistence and apply each one.
116         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
117                 follower1CollectorActor, ApplyState.class, 3);
118         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
119         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
120         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
121
122         // Verify follower 1 applies a log entry for at least the last entry index.
123         verifyApplyJournalEntries(follower1CollectorActor, 2);
124
125         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
126         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
127         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
128         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
129
130         // Verify follower 1]2 applies a log entry for at least the last entry index.
131         verifyApplyJournalEntries(follower2CollectorActor, 2);
132
133         MessageCollectorActor.clearMessages(leaderCollectorActor);
134         MessageCollectorActor.clearMessages(follower1CollectorActor);
135         MessageCollectorActor.clearMessages(follower2CollectorActor);
136
137         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
138         // trimmed the in-memory log so that only the last entry remains.
139         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
140         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
141         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
142         assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
143         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
144         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
145         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
146
147         // Verify the follower's persisted journal log.
148         verifyPersistedJournal(follower1Id, origLeaderJournal);
149         verifyPersistedJournal(follower2Id, origLeaderJournal);
150
151         MessageCollectorActor.clearMessages(leaderCollectorActor);
152         MessageCollectorActor.clearMessages(follower1CollectorActor);
153         MessageCollectorActor.clearMessages(follower2CollectorActor);
154
155         testLog.info("verifyLeaderRecoveryAndInitialization ending");
156     }
157
158     /**
159      * Send a payload to the TestRaftActor to persist and replicate. Since snapshotBatchCount is set to
160      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
161      * scenario, the follower consensus and application of state is delayed until after the snapshot
162      * completes.
163      */
164     private void testFirstSnapshot() throws Exception {
165         testLog.info("testFirstSnapshot starting");
166
167         expSnapshotState.add(recoveredPayload0);
168         expSnapshotState.add(recoveredPayload1);
169         expSnapshotState.add(recoveredPayload2);
170
171         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
172         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
173         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
174
175         // Send the payload.
176         payload3 = sendPayloadData(leaderActor, "three");
177
178         // Wait for snapshot complete.
179         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
180
181         // The snapshot index should not be advanced nor the log trimmed because replicatedToAllIndex
182         // is behind due the followers not being replicated yet via AppendEntries.
183         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
184         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
185         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
186         assertEquals("Leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
187
188         // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
189         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
190         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
191         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
192         verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
193         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
194         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
195         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
196
197         // The leader's persisted journal log should be cleared since we snapshotted.
198         List<ReplicatedLogImplEntry> persistedLeaderJournal =
199                 InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
200         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
201
202         // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
203         // "fake" snapshot in the leader to advance the snapshot index to 2. Also the state should be applied
204         // in all members (via ApplyState).
205         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
206         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
207
208         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
209         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
210
211         verifyApplyJournalEntries(leaderCollectorActor, 3);
212
213         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
214
215         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
216         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
217
218         verifyApplyJournalEntries(follower1CollectorActor, 3);
219
220         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
221         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
222
223         verifyApplyJournalEntries(follower2CollectorActor, 3);
224
225         assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
226         assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex());
227         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
228         assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
229         assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
230         assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
231
232         // The followers should also snapshot so verify.
233
234         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
235         persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
236         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
237         // The last applied index in the snapshot may or may not be the last log entry depending on
238         // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index.
239         assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm());
240         assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex());
241
242         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
243
244         MessageCollectorActor.clearMessages(leaderCollectorActor);
245         MessageCollectorActor.clearMessages(follower1CollectorActor);
246         MessageCollectorActor.clearMessages(follower2CollectorActor);
247
248         testLog.info("testFirstSnapshot ending");
249     }
250
251     /**
252      * Send 3 more payload instances and verify they get applied by all members.
253      */
254     private void testSubsequentReplications() {
255         testLog.info("testSubsequentReplications starting");
256
257         payload4 = sendPayloadData(leaderActor, "four");
258         payload5 = sendPayloadData(leaderActor, "five");
259         payload6 = sendPayloadData(leaderActor, "six");
260
261         // Verify the leader applies the states.
262         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
263         verifyApplyState(applyStates.get(0), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
264         verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
265         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
266
267         // Verify the leader applies a log entry for at least the last entry index.
268         verifyApplyJournalEntries(leaderCollectorActor, 6);
269
270         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
271         // trimmed the in-memory log so that only the last entry remains.
272         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
273         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
274         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
275         assertEquals("Leader journal last index", 6, leaderContext.getReplicatedLog().lastIndex());
276         assertEquals("Leader commit index", 6, leaderContext.getCommitIndex());
277         assertEquals("Leader last applied", 6, leaderContext.getLastApplied());
278         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
279
280         // Verify follower 1 applies the states.
281         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
282         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
283         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
284         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
285
286         // Verify follower 1 applies a log entry for at least the last entry index.
287         verifyApplyJournalEntries(follower1CollectorActor, 6);
288
289         // Verify follower 2 applies the states.
290         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
291         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
292         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
293         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
294
295         // Verify follower 2 applies a log entry for at least the last entry index.
296         verifyApplyJournalEntries(follower2CollectorActor, 6);
297
298         MessageCollectorActor.clearMessages(leaderCollectorActor);
299
300         testLog.info("testSubsequentReplications ending");
301     }
302
303     /**
304      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
305      * consensus occurs and the leader applies the state.
306      */
307     private void testSecondSnapshot() throws Exception {
308         testLog.info("testSecondSnapshot starting");
309
310         expSnapshotState.add(payload3);
311         expSnapshotState.add(payload4);
312         expSnapshotState.add(payload5);
313         expSnapshotState.add(payload6);
314
315         // Delay the CaptureSnapshot message to the leader actor.
316         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
317
318         // Send the payload.
319         payload7 = sendPayloadData(leaderActor, "seven");
320
321         // Capture the CaptureSnapshotReply message so we can send it later.
322         final CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
323                 leaderCollectorActor, CaptureSnapshotReply.class);
324
325         // Wait for the state to be applied in the leader.
326         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
327         verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
328
329         // At this point the leader has applied the new state but the cached snapshot index should not be
330         // advanced by a "fake" snapshot because we're in the middle of a snapshot. We'll wait for at least
331         // one more heartbeat AppendEntriesReply to ensure this does not occur.
332         MessageCollectorActor.clearMessages(leaderCollectorActor);
333         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
334
335         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
336         assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
337         assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
338         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
339         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
340         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
341         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
342
343         // Now deliver the CaptureSnapshotReply.
344         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
345         leaderActor.tell(captureSnapshotReply, leaderActor);
346
347         // Wait for snapshot complete.
348         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
349
350         // Wait for another heartbeat AppendEntriesReply. This should cause a "fake" snapshot to advance the
351         // snapshot index and trimmed the log since we're no longer in a snapshot.
352         MessageCollectorActor.clearMessages(leaderCollectorActor);
353         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
354         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
355         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
356         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
357         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
358         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
359
360         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
361         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
362         // when the snapshot is created (ie when the CaptureSnapshot is processed).
363         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
364         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
365         verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
366         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
367         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
368         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
369
370         // The leader's persisted journal log should be cleared since we did a snapshot.
371         List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
372                 leaderId, ReplicatedLogImplEntry.class);
373         assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
374
375         // Verify the followers apply all 4 new log entries.
376         List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor,
377                 ApplyState.class, 4);
378         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
379         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
380         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
381         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
382
383         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
384         verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
385         verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
386         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
387         verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
388
389         // Verify the follower's snapshot index has also advanced. (after another AppendEntries heartbeat
390         // to be safe).
391
392         MessageCollectorActor.clearMessages(follower1CollectorActor);
393         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
394         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
395         assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
396         assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
397         assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
398         assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex());
399         assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex());
400
401         MessageCollectorActor.clearMessages(follower2CollectorActor);
402         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
403         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
404         assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
405         assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
406         assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
407         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
408         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
409
410         expSnapshotState.add(payload7);
411
412         testLog.info("testSecondSnapshot ending");
413     }
414
415     /**
416      * Kill the leader actor, reinstate it and verify the recovered journal.
417      */
418     private void testLeaderReinstatement() {
419         testLog.info("testLeaderReinstatement starting");
420
421         killActor(leaderActor);
422
423         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
424
425         leaderActor.underlyingActor().waitForRecoveryComplete();
426
427         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
428
429         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
430         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
431         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
432         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
433         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
434         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
435         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().last(), currentTerm, 7, payload7);
436
437         testLog.info("testLeaderReinstatement ending");
438     }
439 }