2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import com.google.common.collect.ImmutableMap;
15 import java.util.Arrays;
16 import java.util.HashMap;
17 import java.util.List;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
26 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30 * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
32 * @author Thomas Pantelis
34 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
36 private MockPayload payload0;
37 private MockPayload payload1;
41 follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
42 newFollowerConfigParams());
44 Map<String, String> leaderPeerAddresses = new HashMap<>();
45 leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString());
46 leaderPeerAddresses.put(follower2Id, "");
48 leaderConfigParams = newLeaderConfigParams();
49 leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
51 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
52 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
54 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
58 public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
60 send2InitialPayloads();
62 // Block these messages initially so we can control the sequence.
63 leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
64 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
66 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
68 // This should trigger a snapshot.
69 final MockPayload payload3 = sendPayloadData(leaderActor, "three");
71 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
73 // Send another payload.
74 final MockPayload payload4 = sendPayloadData(leaderActor, "four");
76 // Now deliver the AppendEntries to the follower
77 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
79 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
81 // Now deliver the CaptureSnapshotReply to the leader.
82 CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
83 leaderCollectorActor, CaptureSnapshotReply.class);
84 leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
85 leaderActor.tell(captureSnapshotReply, leaderActor);
87 // Wait for snapshot complete.
88 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
90 reinstateLeaderActor();
92 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
93 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
94 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
95 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
96 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
97 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
99 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
100 leaderActor.underlyingActor().getState());
104 public void testStatePersistedAfterSnapshotPersisted() {
106 send2InitialPayloads();
108 // Block these messages initially so we can control the sequence.
109 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
111 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
113 // This should trigger a snapshot.
114 final MockPayload payload3 = sendPayloadData(leaderActor, "three");
116 // Send another payload.
117 final MockPayload payload4 = sendPayloadData(leaderActor, "four");
119 MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
121 // Wait for snapshot complete.
122 MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
124 // Now deliver the AppendEntries to the follower
125 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
127 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
129 reinstateLeaderActor();
131 assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
132 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
133 assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
134 assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
135 assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
136 assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
138 assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
139 leaderActor.underlyingActor().getState());
143 public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
145 send2InitialPayloads();
147 leader = leaderActor.underlyingActor().getCurrentBehavior();
149 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
150 newFollowerConfigParams());
151 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
153 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
155 final MockPayload payload2 = sendPayloadData(leaderActor, "two");
157 // Verify the leader applies the 3rd payload state.
158 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
160 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
162 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
163 assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
164 assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
165 assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
167 killActor(follower2Actor);
169 InMemoryJournal.clear();
171 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
172 newFollowerConfigParams());
173 TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
174 follower2CollectorActor = follower2Underlying.collectorActor();
175 follower2Context = follower2Underlying.getRaftActorContext();
177 leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
179 // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
180 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
182 // Wait for the follower to persist the snapshot.
183 MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
185 final List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
187 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
188 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
189 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
190 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
192 killActor(follower2Actor);
194 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
195 newFollowerConfigParams());
197 follower2Underlying = follower2Actor.underlyingActor();
198 follower2Underlying.waitForRecoveryComplete();
199 follower2Context = follower2Underlying.getRaftActorContext();
201 assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
202 assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
203 assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
204 assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
207 private void reinstateLeaderActor() {
208 killActor(leaderActor);
210 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
212 leaderActor.underlyingActor().waitForRecoveryComplete();
214 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
217 private void send2InitialPayloads() {
218 waitUntilLeader(leaderActor);
219 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
221 payload0 = sendPayloadData(leaderActor, "zero");
223 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
225 payload1 = sendPayloadData(leaderActor, "one");
227 // Verify the leader applies the states.
228 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
230 assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
232 MessageCollectorActor.clearMessages(leaderCollectorActor);
233 MessageCollectorActor.clearMessages(follower1CollectorActor);