2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import java.util.List;
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
23 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
27 * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
29 * @author Thomas Pantelis
31 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
33 private MockPayload payload0;
34 private MockPayload payload1;
38 follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId)),
39 newFollowerConfigParams());
41 leaderConfigParams = newLeaderConfigParams();
42 leaderActor = newTestRaftActor(leaderId, Map.of(follower1Id, follower1Actor.path().toString(), follower2Id, ""),
45 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
46 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
48 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
52 public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
54 send2InitialPayloads();
56 // Block these messages initially so we can control the sequence.
57 leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
58 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
60 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
62 // This should trigger a snapshot.
63 final MockPayload payload3 = sendPayloadData(leaderActor, "three");
65 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
67 // Send another payload.
68 final MockPayload payload4 = sendPayloadData(leaderActor, "four");
70 // Now deliver the AppendEntries to the follower
71 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
73 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
75 // Now deliver the CaptureSnapshotReply to the leader.
76 CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
77 leaderCollectorActor, CaptureSnapshotReply.class);
78 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
79 leaderActor.tell(captureSnapshotReply, leaderActor);
81 // Wait for snapshot complete.
82 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
84 reinstateLeaderActor();
86 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
87 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
88 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
89 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
90 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
91 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
93 assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
94 leaderActor.underlyingActor().getState());
98 public void testStatePersistedAfterSnapshotPersisted() {
100 send2InitialPayloads();
102 // Block these messages initially so we can control the sequence.
103 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
105 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
107 // This should trigger a snapshot.
108 final MockPayload payload3 = sendPayloadData(leaderActor, "three");
110 // Send another payload.
111 final MockPayload payload4 = sendPayloadData(leaderActor, "four");
113 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
115 // Wait for snapshot complete.
116 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
118 // Now deliver the AppendEntries to the follower
119 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
121 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
123 reinstateLeaderActor();
125 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
126 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
127 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
128 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
129 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
130 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
132 assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
133 leaderActor.underlyingActor().getState());
137 public void testFollowerRecoveryAfterInstallSnapshot() {
139 send2InitialPayloads();
141 leader = leaderActor.underlyingActor().getCurrentBehavior();
143 follower2Actor = newTestRaftActor(follower2Id,
144 Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
145 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
147 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
149 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
151 // Verify the leader applies the 3rd payload state.
152 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
154 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
156 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
157 assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
158 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
159 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
161 killActor(follower2Actor);
163 InMemoryJournal.clear();
165 follower2Actor = newTestRaftActor(follower2Id,
166 Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
167 TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
168 follower2CollectorActor = follower2Underlying.collectorActor();
169 follower2Context = follower2Underlying.getRaftActorContext();
171 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
173 // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
174 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
176 // Wait for the follower to persist the snapshot.
177 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
179 final List<MockPayload> expFollowerState = List.of(payload0, payload1, payload2);
181 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
182 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
183 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
184 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
186 killActor(follower2Actor);
188 follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId)),
189 newFollowerConfigParams());
191 follower2Underlying = follower2Actor.underlyingActor();
192 follower2Underlying.waitForRecoveryComplete();
193 follower2Context = follower2Underlying.getRaftActorContext();
195 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
196 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
197 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
198 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
202 public void testRecoveryDeleteEntries() {
203 send2InitialPayloads();
205 sendPayloadData(leaderActor, "two");
207 // This should trigger a snapshot.
208 sendPayloadData(leaderActor, "three");
210 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
211 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
213 // Disconnect follower from leader
214 killActor(follower1Actor);
216 // Send another payloads
217 sendPayloadData(leaderActor, "four");
218 sendPayloadData(leaderActor, "five");
220 verifyRaftState(leaderActor, raftState -> {
221 assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
224 // Remove entries started from 4 index
225 leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
227 verifyRaftState(leaderActor, raftState -> {
228 assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
232 final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
233 final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
235 verifyRaftState(leaderActor, raftState -> {
236 assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
239 reinstateLeaderActor();
241 final var log = leaderActor.underlyingActor().getReplicatedLog();
242 assertEquals("Leader last index", 5, log.lastIndex());
243 assertEquals(List.of(payload4, payload5), List.of(log.get(4).getData(), log.get(5).getData()));
246 private void reinstateLeaderActor() {
247 killActor(leaderActor);
249 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
251 leaderActor.underlyingActor().waitForRecoveryComplete();
253 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
256 private void send2InitialPayloads() {
257 waitUntilLeader(leaderActor);
258 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
260 payload0 = sendPayloadData(leaderActor, "zero");
262 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
264 payload1 = sendPayloadData(leaderActor, "one");
266 // Verify the leader applies the states.
267 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
269 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
271 MessageCollectorActor.clearMessages(leaderCollectorActor);
272 MessageCollectorActor.clearMessages(follower1CollectorActor);