Bug 3570: Use SnapShot lastAppliedIndex for install snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import akka.actor.ActorRef;
12 import akka.persistence.SaveSnapshotSuccess;
13 import com.google.common.collect.ImmutableMap;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import org.junit.Before;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
27
28 /**
29  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
30  *
31  * @author Thomas Pantelis
32  */
33 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
34
35     private MockPayload payload0;
36     private MockPayload payload1;
37
38     @Before
39     public void setup() {
40         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
41                 newFollowerConfigParams());
42
43         Map<String, String> peerAddresses = new HashMap<>();
44         peerAddresses.put(follower1Id, follower1Actor.path().toString());
45         peerAddresses.put(follower2Id, "");
46
47         leaderConfigParams = newLeaderConfigParams();
48         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
49
50         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
51         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
52
53         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
54     }
55
56     @Test
57     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
58
59         send2InitialPayloads();
60
61         // Block these messages initially so we can control the sequence.
62         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
63         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
64
65         MockPayload payload2 = sendPayloadData(leaderActor, "two");
66
67         // This should trigger a snapshot.
68         MockPayload payload3 = sendPayloadData(leaderActor, "three");
69
70         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
71
72         // Send another payload.
73         MockPayload payload4 = sendPayloadData(leaderActor, "four");
74
75         // Now deliver the AppendEntries to the follower
76         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
77
78         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
79
80         // Now deliver the CaptureSnapshotReply to the leader.
81         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
82                 leaderCollectorActor, CaptureSnapshotReply.class);
83         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
84         leaderActor.tell(captureSnapshotReply, leaderActor);
85
86         // Wait for snapshot complete.
87         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
88
89         reinstateLeaderActor();
90
91         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
92         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
93         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
94         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
95         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
96         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
97
98         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
99                 leaderActor.underlyingActor().getState());
100     }
101
102     @Test
103     public void testStatePersistedAfterSnapshotPersisted() {
104
105         send2InitialPayloads();
106
107         // Block these messages initially so we can control the sequence.
108         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
109
110         MockPayload payload2 = sendPayloadData(leaderActor, "two");
111
112         // This should trigger a snapshot.
113         MockPayload payload3 = sendPayloadData(leaderActor, "three");
114
115         // Send another payload.
116         MockPayload payload4 = sendPayloadData(leaderActor, "four");
117
118         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
119
120         // Wait for snapshot complete.
121         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
122
123         // Now deliver the AppendEntries to the follower
124         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
125
126         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
127
128         reinstateLeaderActor();
129
130         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
131         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
132         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
133         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
134         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
135         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
136
137         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
138                 leaderActor.underlyingActor().getState());
139     }
140
141     @Test
142     public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
143
144         send2InitialPayloads();
145
146         leader = leaderActor.underlyingActor().getCurrentBehavior();
147
148         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
149                 newFollowerConfigParams());
150         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
151
152         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
153
154         MockPayload payload2 = sendPayloadData(leaderActor, "two");
155
156         // Verify the leader applies the 3rd payload state.
157         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
158
159         MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
160
161         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
162         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
163         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
164         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
165
166         killActor(follower2Actor);
167
168         InMemoryJournal.clear();
169
170         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
171                 newFollowerConfigParams());
172         TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
173         follower2CollectorActor = follower2Underlying.collectorActor();
174         follower2Context = follower2Underlying.getRaftActorContext();
175
176         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
177
178         // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
179         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
180
181         // Wait for the follower to persist the snapshot.
182         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
183
184         List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
185
186         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
187         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
188         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
189         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
190
191         killActor(follower2Actor);
192
193         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
194                 newFollowerConfigParams());
195
196         follower2Underlying = follower2Actor.underlyingActor();
197         follower2Underlying.waitForRecoveryComplete();
198         follower2Context = follower2Underlying.getRaftActorContext();
199
200         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
201         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
202         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
203         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
204     }
205
206     private void reinstateLeaderActor() {
207         killActor(leaderActor);
208
209         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
210
211         leaderActor.underlyingActor().waitForRecoveryComplete();
212
213         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
214     }
215
216     private void send2InitialPayloads() {
217         waitUntilLeader(leaderActor);
218         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
219
220         payload0 = sendPayloadData(leaderActor, "zero");
221
222         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
223
224         payload1 = sendPayloadData(leaderActor, "one");
225
226         // Verify the leader applies the states.
227         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
228
229         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
230
231         MessageCollectorActor.clearMessages(leaderCollectorActor);
232         MessageCollectorActor.clearMessages(follower1CollectorActor);
233     }
234 }