Cleanup test format
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import com.google.common.collect.ImmutableMap;
15 import java.util.Arrays;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
26 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28
29 /**
30  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
31  *
32  * @author Thomas Pantelis
33  */
34 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
35
36     private MockPayload payload0;
37     private MockPayload payload1;
38
39     @Before
40     public void setup() {
41         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
42                 newFollowerConfigParams());
43
44         Map<String, String> leaderPeerAddresses = new HashMap<>();
45         leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString());
46         leaderPeerAddresses.put(follower2Id, "");
47
48         leaderConfigParams = newLeaderConfigParams();
49         leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
50
51         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
52         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
53
54         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
55     }
56
57     @Test
58     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
59
60         send2InitialPayloads();
61
62         // Block these messages initially so we can control the sequence.
63         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
64         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
65
66         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
67
68         // This should trigger a snapshot.
69         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
70
71         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
72
73         // Send another payload.
74         final MockPayload payload4 = sendPayloadData(leaderActor, "four");
75
76         // Now deliver the AppendEntries to the follower
77         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
78
79         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
80
81         // Now deliver the CaptureSnapshotReply to the leader.
82         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
83                 leaderCollectorActor, CaptureSnapshotReply.class);
84         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
85         leaderActor.tell(captureSnapshotReply, leaderActor);
86
87         // Wait for snapshot complete.
88         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
89
90         reinstateLeaderActor();
91
92         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
93         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
94         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
95         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
96         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
97         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
98
99         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
100                 leaderActor.underlyingActor().getState());
101     }
102
103     @Test
104     public void testStatePersistedAfterSnapshotPersisted() {
105
106         send2InitialPayloads();
107
108         // Block these messages initially so we can control the sequence.
109         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
110
111         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
112
113         // This should trigger a snapshot.
114         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
115
116         // Send another payload.
117         final MockPayload payload4 = sendPayloadData(leaderActor, "four");
118
119         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
120
121         // Wait for snapshot complete.
122         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
123
124         // Now deliver the AppendEntries to the follower
125         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
126
127         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
128
129         reinstateLeaderActor();
130
131         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
132         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
133         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
134         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
135         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
136         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
137
138         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
139                 leaderActor.underlyingActor().getState());
140     }
141
142     @Test
143     public void testFollowerRecoveryAfterInstallSnapshot() {
144
145         send2InitialPayloads();
146
147         leader = leaderActor.underlyingActor().getCurrentBehavior();
148
149         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
150                 newFollowerConfigParams());
151         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
152
153         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
154
155         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
156
157         // Verify the leader applies the 3rd payload state.
158         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
159
160         MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
161
162         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
163         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
164         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
165         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
166
167         killActor(follower2Actor);
168
169         InMemoryJournal.clear();
170
171         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
172                 newFollowerConfigParams());
173         TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
174         follower2CollectorActor = follower2Underlying.collectorActor();
175         follower2Context = follower2Underlying.getRaftActorContext();
176
177         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
178
179         // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
180         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
181
182         // Wait for the follower to persist the snapshot.
183         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
184
185         final List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
186
187         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
188         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
189         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
190         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
191
192         killActor(follower2Actor);
193
194         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
195                 newFollowerConfigParams());
196
197         follower2Underlying = follower2Actor.underlyingActor();
198         follower2Underlying.waitForRecoveryComplete();
199         follower2Context = follower2Underlying.getRaftActorContext();
200
201         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
202         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
203         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
204         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
205     }
206
207     @Test
208     public void testRecoveryDeleteEntries() {
209         send2InitialPayloads();
210
211         sendPayloadData(leaderActor, "two");
212
213         // This should trigger a snapshot.
214         sendPayloadData(leaderActor, "three");
215
216         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
217         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
218
219         // Disconnect follower from leader
220         killActor(follower1Actor);
221
222         // Send another payloads
223         sendPayloadData(leaderActor, "four");
224         sendPayloadData(leaderActor, "five");
225
226         verifyRaftState(leaderActor, raftState -> {
227             assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
228         });
229
230         // Remove entries started from 4 index
231         leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
232
233         verifyRaftState(leaderActor, raftState -> {
234             assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
235         });
236
237         // Send new payloads
238         final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
239         final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
240
241         verifyRaftState(leaderActor, raftState -> {
242             assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
243         });
244
245         reinstateLeaderActor();
246
247         assertEquals("Leader last index", 5 , leaderActor.underlyingActor().getReplicatedLog().lastIndex());
248         assertEquals(payload4, leaderActor.underlyingActor().getReplicatedLog().get(4).getData());
249         assertEquals(payload5, leaderActor.underlyingActor().getReplicatedLog().get(5).getData());
250     }
251
252     private void reinstateLeaderActor() {
253         killActor(leaderActor);
254
255         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
256
257         leaderActor.underlyingActor().waitForRecoveryComplete();
258
259         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
260     }
261
262     private void send2InitialPayloads() {
263         waitUntilLeader(leaderActor);
264         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
265
266         payload0 = sendPayloadData(leaderActor, "zero");
267
268         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
269
270         payload1 = sendPayloadData(leaderActor, "one");
271
272         // Verify the leader applies the states.
273         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
274
275         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
276
277         MessageCollectorActor.clearMessages(leaderCollectorActor);
278         MessageCollectorActor.clearMessages(follower1CollectorActor);
279     }
280 }