2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.actor.ActorRef;
12 import akka.persistence.SaveSnapshotSuccess;
13 import com.google.common.collect.ImmutableMap;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
18 import org.junit.Before;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
29 * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
31 * @author Thomas Pantelis
33 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
35 private MockPayload payload0;
36 private MockPayload payload1;
40 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
41 newFollowerConfigParams());
43 Map<String, String> peerAddresses = new HashMap<>();
44 peerAddresses.put(follower1Id, follower1Actor.path().toString());
45 peerAddresses.put(follower2Id, "");
47 leaderConfigParams = newLeaderConfigParams();
48 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
50 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
51 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
53 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
57 public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
59 send2InitialPayloads();
61 // Block these messages initially so we can control the sequence.
62 leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
63 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
65 MockPayload payload2 = sendPayloadData(leaderActor, "two");
67 // This should trigger a snapshot.
68 MockPayload payload3 = sendPayloadData(leaderActor, "three");
70 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
72 // Send another payload.
73 MockPayload payload4 = sendPayloadData(leaderActor, "four");
75 // Now deliver the AppendEntries to the follower
76 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
78 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
80 // Now deliver the CaptureSnapshotReply to the leader.
81 CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
82 leaderCollectorActor, CaptureSnapshotReply.class);
83 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
84 leaderActor.tell(captureSnapshotReply, leaderActor);
86 // Wait for snapshot complete.
87 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
89 reinstateLeaderActor();
91 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
92 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
93 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
94 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
95 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
96 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
98 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
99 leaderActor.underlyingActor().getState());
103 public void testStatePersistedAfterSnapshotPersisted() {
105 send2InitialPayloads();
107 // Block these messages initially so we can control the sequence.
108 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
110 MockPayload payload2 = sendPayloadData(leaderActor, "two");
112 // This should trigger a snapshot.
113 MockPayload payload3 = sendPayloadData(leaderActor, "three");
115 // Send another payload.
116 MockPayload payload4 = sendPayloadData(leaderActor, "four");
118 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
120 // Wait for snapshot complete.
121 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
123 // Now deliver the AppendEntries to the follower
124 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
126 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
128 reinstateLeaderActor();
130 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
131 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
132 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
133 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
134 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
135 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
137 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
138 leaderActor.underlyingActor().getState());
142 public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
144 send2InitialPayloads();
146 leader = leaderActor.underlyingActor().getCurrentBehavior();
148 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
149 newFollowerConfigParams());
150 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
152 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
154 MockPayload payload2 = sendPayloadData(leaderActor, "two");
156 // Verify the leader applies the 3rd payload state.
157 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
159 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
161 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
162 assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
163 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
164 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
166 killActor(follower2Actor);
168 InMemoryJournal.clear();
170 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
171 newFollowerConfigParams());
172 TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
173 follower2CollectorActor = follower2Underlying.collectorActor();
174 follower2Context = follower2Underlying.getRaftActorContext();
176 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
178 // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
179 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
181 // Wait for the follower to persist the snapshot.
182 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
184 List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
186 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
187 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
188 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
189 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
191 killActor(follower2Actor);
193 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
194 newFollowerConfigParams());
196 follower2Underlying = follower2Actor.underlyingActor();
197 follower2Underlying.waitForRecoveryComplete();
198 follower2Context = follower2Underlying.getRaftActorContext();
200 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
201 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
202 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
203 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
206 private void reinstateLeaderActor() {
207 killActor(leaderActor);
209 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
211 leaderActor.underlyingActor().waitForRecoveryComplete();
213 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
216 private void send2InitialPayloads() {
217 waitUntilLeader(leaderActor);
218 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
220 payload0 = sendPayloadData(leaderActor, "zero");
222 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
224 payload1 = sendPayloadData(leaderActor, "one");
226 // Verify the leader applies the states.
227 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
229 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
231 MessageCollectorActor.clearMessages(leaderCollectorActor);
232 MessageCollectorActor.clearMessages(follower1CollectorActor);