2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.Arrays;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24 * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
26 * @author Thomas Pantelis
28 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
30 private MockPayload payload0;
31 private MockPayload payload1;
35 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
36 newFollowerConfigParams());
38 peerAddresses = ImmutableMap.<String, String>builder().
39 put(follower1Id, follower1Actor.path().toString()).build();
41 leaderConfigParams = newLeaderConfigParams();
42 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
44 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
45 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
47 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
51 public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
53 send2InitialPayloads();
55 // Block these messages initially so we can control the sequence.
56 leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
57 leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
58 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
60 MockPayload payload2 = sendPayloadData(leaderActor, "two");
62 // This should trigger a snapshot.
63 MockPayload payload3 = sendPayloadData(leaderActor, "three");
65 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
67 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
68 leaderCollectorActor, CaptureSnapshot.class);
70 // First, deliver the CaptureSnapshot to the leader.
71 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
72 leaderActor.tell(captureSnapshot, leaderActor);
74 // Send another payload.
75 MockPayload payload4 = sendPayloadData(leaderActor, "four");
77 // Now deliver the AppendEntries to the follower
78 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
80 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
82 // Now deliver the CaptureSnapshotReply to the leader.
83 CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
84 leaderCollectorActor, CaptureSnapshotReply.class);
85 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
86 leaderActor.tell(captureSnapshotReply, leaderActor);
88 // Wait for snapshot complete.
89 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
91 reinstateLeaderActor();
93 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
94 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
95 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
96 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
97 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
98 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
100 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
101 leaderActor.underlyingActor().getState());
105 public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
107 send2InitialPayloads();
109 // Block these messages initially so we can control the sequence.
110 leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
111 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
113 MockPayload payload2 = sendPayloadData(leaderActor, "two");
115 // This should trigger a snapshot.
116 MockPayload payload3 = sendPayloadData(leaderActor, "three");
118 // Send another payload.
119 MockPayload payload4 = sendPayloadData(leaderActor, "four");
121 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
123 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
124 leaderCollectorActor, CaptureSnapshot.class);
126 // First, deliver the AppendEntries to the follower
127 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
129 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
131 // Now deliver the CaptureSnapshot to the leader.
132 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
133 leaderActor.tell(captureSnapshot, leaderActor);
135 // Wait for snapshot complete.
136 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
138 reinstateLeaderActor();
140 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
141 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
142 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
143 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
144 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
145 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
147 // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
148 // were included in the snapshot. They were also included as unapplied entries in the snapshot as
149 // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
150 // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
151 // This is a side effect of trimming the persisted log to the sequence number captured at the time
152 // the snapshot was initiated.
153 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
154 payload3, payload4), leaderActor.underlyingActor().getState());
158 public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
160 send2InitialPayloads();
162 // Block these messages initially so we can control the sequence.
163 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
165 MockPayload payload2 = sendPayloadData(leaderActor, "two");
167 // This should trigger a snapshot.
168 MockPayload payload3 = sendPayloadData(leaderActor, "three");
170 // Send another payload.
171 MockPayload payload4 = sendPayloadData(leaderActor, "four");
173 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
175 // Wait for snapshot complete.
176 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
178 // Now deliver the AppendEntries to the follower
179 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
181 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
183 reinstateLeaderActor();
185 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
186 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
187 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
188 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
189 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
190 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
192 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
193 leaderActor.underlyingActor().getState());
196 private void reinstateLeaderActor() {
197 killActor(leaderActor);
199 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
201 leaderActor.underlyingActor().waitForRecoveryComplete();
203 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
206 private void send2InitialPayloads() {
207 waitUntilLeader(leaderActor);
208 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
210 payload0 = sendPayloadData(leaderActor, "zero");
211 payload1 = sendPayloadData(leaderActor, "one");
213 // Verify the leader applies the states.
214 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
216 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
218 MessageCollectorActor.clearMessages(leaderCollectorActor);
219 MessageCollectorActor.clearMessages(follower1CollectorActor);