Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.actor.ActorRef;
13 import akka.persistence.SaveSnapshotSuccess;
14 import java.util.List;
15 import java.util.Map;
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
23 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
25
26 /**
27  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
28  *
29  * @author Thomas Pantelis
30  */
31 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
32
33     private MockPayload payload0;
34     private MockPayload payload1;
35
36     @Before
37     public void setup() {
38         follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId)),
39                 newFollowerConfigParams());
40
41         leaderConfigParams = newLeaderConfigParams();
42         leaderActor = newTestRaftActor(leaderId, Map.of(follower1Id, follower1Actor.path().toString(), follower2Id, ""),
43             leaderConfigParams);
44
45         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
46         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
47
48         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
49     }
50
51     @Test
52     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
53
54         send2InitialPayloads();
55
56         // Block these messages initially so we can control the sequence.
57         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
58         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
59
60         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
61
62         // This should trigger a snapshot.
63         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
64
65         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
66
67         // Send another payload.
68         final MockPayload payload4 = sendPayloadData(leaderActor, "four");
69
70         // Now deliver the AppendEntries to the follower
71         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
72
73         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
74
75         // Now deliver the CaptureSnapshotReply to the leader.
76         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
77                 leaderCollectorActor, CaptureSnapshotReply.class);
78         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
79         leaderActor.tell(captureSnapshotReply, leaderActor);
80
81         // Wait for snapshot complete.
82         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
83
84         reinstateLeaderActor();
85
86         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
87         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
88         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
89         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
90         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
91         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
92
93         assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
94                 leaderActor.underlyingActor().getState());
95     }
96
97     @Test
98     public void testStatePersistedAfterSnapshotPersisted() {
99
100         send2InitialPayloads();
101
102         // Block these messages initially so we can control the sequence.
103         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
104
105         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
106
107         // This should trigger a snapshot.
108         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
109
110         // Send another payload.
111         final MockPayload payload4 = sendPayloadData(leaderActor, "four");
112
113         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
114
115         // Wait for snapshot complete.
116         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
117
118         // Now deliver the AppendEntries to the follower
119         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
120
121         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
122
123         reinstateLeaderActor();
124
125         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
126         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
127         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
128         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
129         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
130         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
131
132         assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
133                 leaderActor.underlyingActor().getState());
134     }
135
136     @Test
137     public void testFollowerRecoveryAfterInstallSnapshot() {
138
139         send2InitialPayloads();
140
141         leader = leaderActor.underlyingActor().getCurrentBehavior();
142
143         follower2Actor = newTestRaftActor(follower2Id,
144                 Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
145         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
146
147         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
148
149         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
150
151         // Verify the leader applies the 3rd payload state.
152         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
153
154         MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
155
156         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
157         assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
158         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
159         assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
160
161         killActor(follower2Actor);
162
163         InMemoryJournal.clear();
164
165         follower2Actor = newTestRaftActor(follower2Id,
166                 Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
167         TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
168         follower2CollectorActor = follower2Underlying.collectorActor();
169         follower2Context = follower2Underlying.getRaftActorContext();
170
171         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
172
173         // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
174         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
175
176         // Wait for the follower to persist the snapshot.
177         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
178
179         final List<MockPayload> expFollowerState = List.of(payload0, payload1, payload2);
180
181         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
182         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
183         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
184         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
185
186         killActor(follower2Actor);
187
188         follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId)),
189                 newFollowerConfigParams());
190
191         follower2Underlying = follower2Actor.underlyingActor();
192         follower2Underlying.waitForRecoveryComplete();
193         follower2Context = follower2Underlying.getRaftActorContext();
194
195         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
196         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
197         assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
198         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
199     }
200
201     @Test
202     public void testRecoveryDeleteEntries() {
203         send2InitialPayloads();
204
205         sendPayloadData(leaderActor, "two");
206
207         // This should trigger a snapshot.
208         sendPayloadData(leaderActor, "three");
209
210         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
211         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
212
213         // Disconnect follower from leader
214         killActor(follower1Actor);
215
216         // Send another payloads
217         sendPayloadData(leaderActor, "four");
218         sendPayloadData(leaderActor, "five");
219
220         verifyRaftState(leaderActor, raftState -> {
221             assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
222         });
223
224         // Remove entries started from 4 index
225         leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
226
227         verifyRaftState(leaderActor, raftState -> {
228             assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
229         });
230
231         // Send new payloads
232         final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
233         final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
234
235         verifyRaftState(leaderActor, raftState -> {
236             assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
237         });
238
239         reinstateLeaderActor();
240
241         final var log = leaderActor.underlyingActor().getReplicatedLog();
242         assertEquals("Leader last index", 5, log.lastIndex());
243         assertEquals(List.of(payload4, payload5), List.of(log.get(4).getData(), log.get(5).getData()));
244     }
245
246     private void reinstateLeaderActor() {
247         killActor(leaderActor);
248
249         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
250
251         leaderActor.underlyingActor().waitForRecoveryComplete();
252
253         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
254     }
255
256     private void send2InitialPayloads() {
257         waitUntilLeader(leaderActor);
258         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
259
260         payload0 = sendPayloadData(leaderActor, "zero");
261
262         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
263
264         payload1 = sendPayloadData(leaderActor, "one");
265
266         // Verify the leader applies the states.
267         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
268
269         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
270
271         MessageCollectorActor.clearMessages(leaderCollectorActor);
272         MessageCollectorActor.clearMessages(follower1CollectorActor);
273     }
274 }