2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.List;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
16 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
25 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28 * Tests replication and snapshots end-to-end using real RaftActors and behavior communication.
30 * @author Thomas Pantelis
32 public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
34 private List<ReplicatedLogImplEntry> origLeaderJournal;
36 private MockPayload recoveredPayload0;
37 private MockPayload recoveredPayload1;
38 private MockPayload recoveredPayload2;
39 private MockPayload payload4;
40 private MockPayload payload5;
41 private MockPayload payload6;
42 private MockPayload payload7;
45 public void runTest() {
46 testLog.info("testReplicationAndSnapshots starting");
48 // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
49 // than the snapshotBatchCount).
51 InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
52 recoveredPayload0 = new MockPayload("zero");
53 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
54 recoveredPayload1 = new MockPayload("one");
55 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
56 recoveredPayload2 = new MockPayload("two");
57 InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
58 InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
60 origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
62 // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
63 // persistence recovery.
65 follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
67 follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
69 peerAddresses = ImmutableMap.<String, String>builder().
70 put(follower1Id, follower1Actor.path().toString()).
71 put(follower2Id, follower2Actor.path().toString()).build();
73 leaderConfigParams = newLeaderConfigParams();
74 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
76 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
77 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
78 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
80 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
82 verifyLeaderRecoveryAndInitialization();
86 testSubsequentReplications();
90 testLeaderReinstatement();
92 testLog.info("testReplicationAndSnapshots ending");
96 * Verify the expected leader is elected as the leader and verify initial syncing of the followers
97 * from the leader's persistence recovery.
99 void verifyLeaderRecoveryAndInitialization() {
100 testLog.info("verifyLeaderRecoveryAndInitialization starting");
102 waitUntilLeader(leaderActor);
104 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
105 assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
107 leader = leaderActor.underlyingActor().getCurrentBehavior();
109 // The followers should receive AppendEntries for each leader log entry that was recovered from
110 // persistence and apply each one.
111 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
112 follower1CollectorActor, ApplyState.class, 3);
113 verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
114 verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
115 verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
117 // Verify follower 1 applies a log entry for at least the last entry index.
118 verifyApplyJournalEntries(follower1CollectorActor, 2);
120 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
121 verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
122 verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
123 verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
125 // Verify follower 1]2 applies a log entry for at least the last entry index.
126 verifyApplyJournalEntries(follower2CollectorActor, 2);
128 MessageCollectorActor.clearMessages(leaderCollectorActor);
129 MessageCollectorActor.clearMessages(follower1CollectorActor);
130 MessageCollectorActor.clearMessages(follower2CollectorActor);
132 // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
133 // trimmed the in-memory log so that only the last entry remains.
134 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
135 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
136 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
137 assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
138 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
139 assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
140 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
142 // Verify the follower's persisted journal log.
143 verifyPersistedJournal(follower1Id, origLeaderJournal);
144 verifyPersistedJournal(follower2Id, origLeaderJournal);
146 MessageCollectorActor.clearMessages(leaderCollectorActor);
147 MessageCollectorActor.clearMessages(follower1CollectorActor);
148 MessageCollectorActor.clearMessages(follower2CollectorActor);
150 testLog.info("verifyLeaderRecoveryAndInitialization ending");
154 * Send a payload to the TestRaftActor to persist and replicate. Since snapshotBatchCount is set to
155 * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
156 * scenario, the follower consensus and application of state is delayed until after the snapshot
159 private void testFirstSnapshot() {
160 testLog.info("testFirstSnapshot starting");
162 byte[] snapshot = new byte[] {1,2,3,4};
163 leaderActor.underlyingActor().setSnapshot(snapshot);
165 // Delay the consensus by temporarily dropping the AppendEntries to both followers.
166 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
167 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
170 MockPayload payload3 = sendPayloadData(leaderActor, "three");
172 // Wait for snapshot complete.
173 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
175 // The snapshot index should not be advanced nor the log trimmed because replicatedToAllIndex
176 // is behind due the followers not being replicated yet via AppendEntries.
177 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
178 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
179 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
180 assertEquals("Leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
182 // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
183 // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
184 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
185 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
186 verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
187 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
188 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
189 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
191 // The leader's persisted journal log should be cleared since we snapshotted.
192 List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
193 assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
195 // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
196 // "fake" snapshot in the leader to advance the snapshot index to 2. Also the state should be applied
197 // in all members (via ApplyState).
198 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
199 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
201 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
202 verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
204 verifyApplyJournalEntries(leaderCollectorActor, 3);
206 assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
208 applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
209 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
211 verifyApplyJournalEntries(follower1CollectorActor, 3);
213 applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
214 verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
216 verifyApplyJournalEntries(follower2CollectorActor, 3);
218 assertEquals("Leader snapshot term", initialTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
219 assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex());
220 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
221 assertEquals("Leader commit index", 3, leaderContext.getCommitIndex());
222 assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
223 assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
225 MessageCollectorActor.clearMessages(leaderCollectorActor);
226 MessageCollectorActor.clearMessages(follower1CollectorActor);
227 MessageCollectorActor.clearMessages(follower2CollectorActor);
229 testLog.info("testFirstSnapshot ending");
233 * Send 3 more payload instances and verify they get applied by all members.
235 private void testSubsequentReplications() {
236 testLog.info("testSubsequentReplications starting");
238 payload4 = sendPayloadData(leaderActor, "four");
239 payload5 = sendPayloadData(leaderActor, "five");
240 payload6 = sendPayloadData(leaderActor, "six");
242 // Verify the leader applies the states.
243 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
244 verifyApplyState(applyStates.get(0), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
245 verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
246 verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
248 // Verify the leader applies a log entry for at least the last entry index.
249 verifyApplyJournalEntries(leaderCollectorActor, 6);
251 // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
252 // trimmed the in-memory log so that only the last entry remains.
253 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
254 assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
255 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
256 assertEquals("Leader journal last index", 6, leaderContext.getReplicatedLog().lastIndex());
257 assertEquals("Leader commit index", 6, leaderContext.getCommitIndex());
258 assertEquals("Leader last applied", 6, leaderContext.getLastApplied());
259 assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
261 // Verify follower 1 applies the states.
262 applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
263 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
264 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
265 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
267 // Verify follower 1 applies a log entry for at least the last entry index.
268 verifyApplyJournalEntries(follower1CollectorActor, 6);
270 // Verify follower 2 applies the states.
271 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
272 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
273 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
274 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
276 // Verify follower 2 applies a log entry for at least the last entry index.
277 verifyApplyJournalEntries(follower2CollectorActor, 6);
279 MessageCollectorActor.clearMessages(leaderCollectorActor);
281 testLog.info("testSubsequentReplications ending");
285 * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
286 * consensus occurs and the leader applies the state.
288 private void testSecondSnapshot() {
289 testLog.info("testSecondSnapshot starting");
291 byte[] snapshot = new byte[] {5,6,7,8};
292 leaderActor.underlyingActor().setSnapshot(snapshot);
294 // Delay the CaptureSnapshot message to the leader actor.
295 leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
298 payload7 = sendPayloadData(leaderActor, "seven");
300 // Capture the CaptureSnapshot message so we can send it later.
301 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
302 leaderCollectorActor, CaptureSnapshot.class);
304 // Wait for the state to be applied in the leader.
305 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
306 verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
308 // At this point the leader has applied the new state but the cached snapshot index should not be
309 // advanced by a "fake" snapshot because we're in the middle of a snapshot. We'll wait for at least
310 // one more heartbeat AppendEntriesReply to ensure this does not occur.
311 MessageCollectorActor.clearMessages(leaderCollectorActor);
312 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
314 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
315 assertEquals("Leader snapshot index", 5, leaderContext.getReplicatedLog().getSnapshotIndex());
316 assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size());
317 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
318 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
319 assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
320 assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
322 // Now deliver the CaptureSnapshot.
323 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
324 leaderActor.tell(captureSnapshot, leaderActor);
326 // Wait for CaptureSnapshotReply to complete.
327 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
329 // Wait for snapshot complete.
330 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
332 // Wait for another heartbeat AppendEntriesReply. This should cause a "fake" snapshot to advance the
333 // snapshot index and trimmed the log since we're no longer in a snapshot.
334 MessageCollectorActor.clearMessages(leaderCollectorActor);
335 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class);
336 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
337 assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
338 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
339 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
340 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
342 // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
344 List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
345 assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
346 verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
347 List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
348 assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
349 verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
351 // The leader's persisted journal log should be cleared since we did a snapshot.
352 List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
353 leaderId, ReplicatedLogImplEntry.class);
354 assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
356 // Verify the followers apply all 4 new log entries.
357 List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4);
358 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
359 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
360 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
361 verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
363 applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4);
364 verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
365 verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
366 verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
367 verifyApplyState(applyStates.get(3), null, null, currentTerm, 7, payload7);
369 // Verify the follower's snapshot index has also advanced. (after another AppendEntries heartbeat
372 MessageCollectorActor.clearMessages(follower1CollectorActor);
373 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
374 RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
375 assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
376 assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
377 assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
378 assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex());
379 assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex());
381 MessageCollectorActor.clearMessages(follower2CollectorActor);
382 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
383 RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
384 assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
385 assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
386 assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
387 assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
388 assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
390 testLog.info("testSecondSnapshot ending");
394 * Kill the leader actor, reinstate it and verify the recovered journal.
396 private void testLeaderReinstatement() {
397 testLog.info("testLeaderReinstatement starting");
399 killActor(leaderActor);
401 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
403 leaderActor.underlyingActor().waitForRecoveryComplete();
405 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
406 assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
407 assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
408 assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
409 assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
410 assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
411 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().last(), currentTerm, 7, payload7);
413 testLog.info("testLeaderReinstatement ending");