Bug 3570: Persist snapshot on follower ApplySnapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import akka.actor.ActorRef;
12 import akka.persistence.SaveSnapshotSuccess;
13 import com.google.common.collect.ImmutableMap;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import org.junit.Before;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28
29 /**
30  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
31  *
32  * @author Thomas Pantelis
33  */
34 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
35
36     private MockPayload payload0;
37     private MockPayload payload1;
38
39     @Before
40     public void setup() {
41         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
42                 newFollowerConfigParams());
43
44         Map<String, String> peerAddresses = new HashMap<>();
45         peerAddresses.put(follower1Id, follower1Actor.path().toString());
46         peerAddresses.put(follower2Id, "");
47
48         leaderConfigParams = newLeaderConfigParams();
49         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
50
51         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
52         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
53
54         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
55     }
56
57     @Test
58     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
59
60         send2InitialPayloads();
61
62         // Block these messages initially so we can control the sequence.
63         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
64         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
65
66         MockPayload payload2 = sendPayloadData(leaderActor, "two");
67
68         // This should trigger a snapshot.
69         MockPayload payload3 = sendPayloadData(leaderActor, "three");
70
71         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
72
73         // Send another payload.
74         MockPayload payload4 = sendPayloadData(leaderActor, "four");
75
76         // Now deliver the AppendEntries to the follower
77         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
78
79         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
80
81         // Now deliver the CaptureSnapshotReply to the leader.
82         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
83                 leaderCollectorActor, CaptureSnapshotReply.class);
84         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
85         leaderActor.tell(captureSnapshotReply, leaderActor);
86
87         // Wait for snapshot complete.
88         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
89
90         reinstateLeaderActor();
91
92         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
93         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
94         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
95         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
96         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
97         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
98
99         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
100                 leaderActor.underlyingActor().getState());
101     }
102
103     @Test
104     public void testStatePersistedAfterSnapshotPersisted() {
105
106         send2InitialPayloads();
107
108         // Block these messages initially so we can control the sequence.
109         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
110
111         MockPayload payload2 = sendPayloadData(leaderActor, "two");
112
113         // This should trigger a snapshot.
114         MockPayload payload3 = sendPayloadData(leaderActor, "three");
115
116         // Send another payload.
117         MockPayload payload4 = sendPayloadData(leaderActor, "four");
118
119         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
120
121         // Wait for snapshot complete.
122         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
123
124         // Now deliver the AppendEntries to the follower
125         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
126
127         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
128
129         reinstateLeaderActor();
130
131         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
132         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
133         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
134         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
135         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
136         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
137
138         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
139                 leaderActor.underlyingActor().getState());
140     }
141
142     @Test
143     public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
144
145         send2InitialPayloads();
146
147         leader = leaderActor.underlyingActor().getCurrentBehavior();
148
149         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
150                 newFollowerConfigParams());
151         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
152
153         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
154
155         MockPayload payload2 = sendPayloadData(leaderActor, "two");
156
157         // Verify the leader applies the 3rd payload state.
158         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
159
160         MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
161
162         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
163         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
164         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
165         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
166
167         killActor(follower2Actor);
168
169         InMemoryJournal.clear();
170
171         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
172                 newFollowerConfigParams());
173         TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
174         follower2CollectorActor = follower2Underlying.collectorActor();
175         follower2Context = follower2Underlying.getRaftActorContext();
176
177         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
178
179         // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
180         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
181
182         // Wait for the follower to persist the snapshot.
183         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
184
185         // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
186         // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
187         // snapshotIndex and not the actual last index included in the snapshot.
188         // FIXME? - is this OK?
189         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
190         List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
191
192         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
193         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
194         assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
195         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
196
197         killActor(follower2Actor);
198
199         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
200                 newFollowerConfigParams());
201
202         follower2Underlying = follower2Actor.underlyingActor();
203         follower2Underlying.waitForRecoveryComplete();
204         follower2Context = follower2Underlying.getRaftActorContext();
205
206         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
207         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
208         assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
209         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
210     }
211
212     private void reinstateLeaderActor() {
213         killActor(leaderActor);
214
215         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
216
217         leaderActor.underlyingActor().waitForRecoveryComplete();
218
219         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
220     }
221
222     private void send2InitialPayloads() {
223         waitUntilLeader(leaderActor);
224         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
225
226         payload0 = sendPayloadData(leaderActor, "zero");
227
228         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
229
230         payload1 = sendPayloadData(leaderActor, "one");
231
232         // Verify the leader applies the states.
233         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
234
235         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
236
237         MessageCollectorActor.clearMessages(leaderCollectorActor);
238         MessageCollectorActor.clearMessages(follower1CollectorActor);
239     }
240 }