2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.List;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
16 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
25 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28 * Tests replication and snapshots end-to-end using real RaftActors and behavior communication.
30 * @author Thomas Pantelis
32 public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
34 private List<ReplicatedLogImplEntry> origLeaderJournal;
36 private MockPayload recoveredPayload0;
37 private MockPayload recoveredPayload1;
38 private MockPayload recoveredPayload2;
39 private MockPayload payload3;
40 private MockPayload payload4;
41 private MockPayload payload5;
42 private MockPayload payload6;
43 private MockPayload payload7;
46 public void runTest() throws Exception {
47 testLog.info("testReplicationAndSnapshots starting");
49 // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
50 // than the snapshotBatchCount).
52 InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
53 recoveredPayload0 = new MockPayload("zero");
54 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
55 recoveredPayload1 = new MockPayload("one");
56 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
57 recoveredPayload2 = new MockPayload("two");
58 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
59 InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
61 origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
63 // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
64 // persistence recovery.
66 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
67 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
69 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
70 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
72 peerAddresses = ImmutableMap.<String, String>builder().
73 put(follower1Id, follower1Actor.path().toString()).
74 put(follower2Id, follower2Actor.path().toString()).build();
76 leaderConfigParams = newLeaderConfigParams();
77 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
79 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
80 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
81 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
83 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
85 verifyLeaderRecoveryAndInitialization();
89 testSubsequentReplications();
93 testLeaderReinstatement();
95 testLog.info("testReplicationAndSnapshots ending");
99 * Verify the expected leader is elected as the leader and verify initial syncing of the followers
100 * from the leader's persistence recovery.
102 void verifyLeaderRecoveryAndInitialization() {
103 testLog.info("verifyLeaderRecoveryAndInitialization starting");
105 waitUntilLeader(leaderActor);
107 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
108 assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
110 leader = leaderActor.underlyingActor().getCurrentBehavior();
112 // The followers should receive AppendEntries for each leader log entry that was recovered from
113 // persistence and apply each one.
114 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
115 follower1CollectorActor, ApplyState.class, 3);
116 verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
117 verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
118 verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
120 // Verify follower 1 applies a log entry for at least the last entry index.
121 verifyApplyJournalEntries(follower1CollectorActor, 2);
123 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
124 verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
125 verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
126 verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
128 // Verify follower 1]2 applies a log entry for at least the last entry index.
129 verifyApplyJournalEntries(follower2CollectorActor, 2);
131 MessageCollectorActor.clearMessages(leaderCollectorActor);
132 MessageCollectorActor.clearMessages(follower1CollectorActor);
133 MessageCollectorActor.clearMessages(follower2CollectorActor);
135 // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
136 // trimmed the in-memory log so that only the last entry remains.
137 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
138 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
139 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
140 assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
141 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
142 assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
143 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
145 // Verify the follower's persisted journal log.
146 verifyPersistedJournal(follower1Id, origLeaderJournal);
147 verifyPersistedJournal(follower2Id, origLeaderJournal);
149 MessageCollectorActor.clearMessages(leaderCollectorActor);
150 MessageCollectorActor.clearMessages(follower1CollectorActor);
151 MessageCollectorActor.clearMessages(follower2CollectorActor);
153 testLog.info("verifyLeaderRecoveryAndInitialization ending");
157 * Send a payload to the TestRaftActor to persist and replicate. Since snapshotBatchCount is set to
158 * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
159 * scenario, the follower consensus and application of state is delayed until after the snapshot
163 private void testFirstSnapshot() throws Exception {
164 testLog.info("testFirstSnapshot starting");
166 expSnapshotState.add(recoveredPayload0);
167 expSnapshotState.add(recoveredPayload1);
168 expSnapshotState.add(recoveredPayload2);
170 // Delay the consensus by temporarily dropping the AppendEntries to both followers.
171 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
172 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
175 payload3 = sendPayloadData(leaderActor, "three");
177 // Wait for snapshot complete.
178 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
180 // The snapshot index should not be advanced nor the log trimmed because replicatedToAllIndex
181 // is behind due the followers not being replicated yet via AppendEntries.
182 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
183 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
184 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
185 assertEquals("Leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
187 // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
188 // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
189 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
190 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
191 verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
192 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
193 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
194 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
196 // The leader's persisted journal log should be cleared since we snapshotted.
197 List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
198 assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
200 // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
201 // "fake" snapshot in the leader to advance the snapshot index to 2. Also the state should be applied
202 // in all members (via ApplyState).
203 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
204 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
206 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
207 verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
209 verifyApplyJournalEntries(leaderCollectorActor, 3);
211 assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
213 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
214 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
216 verifyApplyJournalEntries(follower1CollectorActor, 3);
218 applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
219 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
221 verifyApplyJournalEntries(follower2CollectorActor, 3);
223 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
224 assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex());
225 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
226 assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
227 assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
228 assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
230 MessageCollectorActor.clearMessages(leaderCollectorActor);
231 MessageCollectorActor.clearMessages(follower1CollectorActor);
232 MessageCollectorActor.clearMessages(follower2CollectorActor);
234 testLog.info("testFirstSnapshot ending");
238 * Send 3 more payload instances and verify they get applied by all members.
240 private void testSubsequentReplications() {
241 testLog.info("testSubsequentReplications starting");
243 payload4 = sendPayloadData(leaderActor, "four");
244 payload5 = sendPayloadData(leaderActor, "five");
245 payload6 = sendPayloadData(leaderActor, "six");
247 // Verify the leader applies the states.
248 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
249 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
250 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
251 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
253 // Verify the leader applies a log entry for at least the last entry index.
254 verifyApplyJournalEntries(leaderCollectorActor, 6);
256 // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
257 // trimmed the in-memory log so that only the last entry remains.
258 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
259 assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
260 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
261 assertEquals("Leader journal last index", 6, leaderContext.getReplicatedLog().lastIndex());
262 assertEquals("Leader commit index", 6, leaderContext.getCommitIndex());
263 assertEquals("Leader last applied", 6, leaderContext.getLastApplied());
264 assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
266 // Verify follower 1 applies the states.
267 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
268 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
269 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
270 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
272 // Verify follower 1 applies a log entry for at least the last entry index.
273 verifyApplyJournalEntries(follower1CollectorActor, 6);
275 // Verify follower 2 applies the states.
276 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
277 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
278 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
279 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
281 // Verify follower 2 applies a log entry for at least the last entry index.
282 verifyApplyJournalEntries(follower2CollectorActor, 6);
284 MessageCollectorActor.clearMessages(leaderCollectorActor);
286 testLog.info("testSubsequentReplications ending");
290 * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
291 * consensus occurs and the leader applies the state.
294 private void testSecondSnapshot() throws Exception {
295 testLog.info("testSecondSnapshot starting");
297 expSnapshotState.add(payload3);
298 expSnapshotState.add(payload4);
299 expSnapshotState.add(payload5);
300 expSnapshotState.add(payload6);
302 // Delay the CaptureSnapshot message to the leader actor.
303 leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
306 payload7 = sendPayloadData(leaderActor, "seven");
308 // Capture the CaptureSnapshot message so we can send it later.
309 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
310 leaderCollectorActor, CaptureSnapshot.class);
312 // Wait for the state to be applied in the leader.
313 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
314 verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
316 // At this point the leader has applied the new state but the cached snapshot index should not be
317 // advanced by a "fake" snapshot because we're in the middle of a snapshot. We'll wait for at least
318 // one more heartbeat AppendEntriesReply to ensure this does not occur.
319 MessageCollectorActor.clearMessages(leaderCollectorActor);
320 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
322 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
323 assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
324 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
325 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
326 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
327 assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
328 assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
330 // Now deliver the CaptureSnapshot.
331 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
332 leaderActor.tell(captureSnapshot, leaderActor);
334 // Wait for CaptureSnapshotReply to complete.
335 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
337 // Wait for snapshot complete.
338 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
340 // Wait for another heartbeat AppendEntriesReply. This should cause a "fake" snapshot to advance the
341 // snapshot index and trimmed the log since we're no longer in a snapshot.
342 MessageCollectorActor.clearMessages(leaderCollectorActor);
343 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
344 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
345 assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
346 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
347 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
348 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
350 expSnapshotState.add(payload7);
352 // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
353 // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
354 // when the snapshot is created (ie when the CaptureSnapshot is processed).
355 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
356 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
357 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
358 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
359 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
360 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
362 // The leader's persisted journal log should be cleared since we did a snapshot.
363 List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
364 leaderId, ReplicatedLogImplEntry.class);
365 assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
367 // Verify the followers apply all 4 new log entries.
368 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4);
369 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
370 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
371 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
372 verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
374 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
375 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
376 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
377 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
378 verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
380 // Verify the follower's snapshot index has also advanced. (after another AppendEntries heartbeat
383 MessageCollectorActor.clearMessages(follower1CollectorActor);
384 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
385 RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
386 assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
387 assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
388 assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
389 assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex());
390 assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex());
392 MessageCollectorActor.clearMessages(follower2CollectorActor);
393 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
394 RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
395 assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
396 assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
397 assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
398 assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
399 assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
401 testLog.info("testSecondSnapshot ending");
405 * Kill the leader actor, reinstate it and verify the recovered journal.
407 private void testLeaderReinstatement() {
408 testLog.info("testLeaderReinstatement starting");
410 killActor(leaderActor);
412 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
414 leaderActor.underlyingActor().waitForRecoveryComplete();
416 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
418 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
419 assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
420 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
421 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
422 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
423 assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
424 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().last(), currentTerm, 7, payload7);
426 testLog.info("testLeaderReinstatement ending");