Merge "Bug 2948: Recovered log entries not applied after prior snapshot"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.Arrays;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
22
23 /**
24  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
25  *
26  * @author Thomas Pantelis
27  */
28 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
29
30     private MockPayload payload0;
31     private MockPayload payload1;
32
33     @Before
34     public void setup() {
35         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
36                 newFollowerConfigParams());
37
38         peerAddresses = ImmutableMap.<String, String>builder().
39                 put(follower1Id, follower1Actor.path().toString()).build();
40
41         leaderConfigParams = newLeaderConfigParams();
42         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
43
44         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
45         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
46
47         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
48     }
49
50     @Test
51     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
52
53         send2InitialPayloads();
54
55         // Block these messages initially so we can control the sequence.
56         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
57         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
58         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
59
60         MockPayload payload2 = sendPayloadData(leaderActor, "two");
61
62         // This should trigger a snapshot.
63         MockPayload payload3 = sendPayloadData(leaderActor, "three");
64
65         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
66
67         CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
68                 leaderCollectorActor, CaptureSnapshot.class);
69
70         // First, deliver the CaptureSnapshot to the leader.
71         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
72         leaderActor.tell(captureSnapshot, leaderActor);
73
74         // Send another payload.
75         MockPayload payload4 = sendPayloadData(leaderActor, "four");
76
77         // Now deliver the AppendEntries to the follower
78         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
79
80         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
81
82         // Now deliver the CaptureSnapshotReply to the leader.
83         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
84                 leaderCollectorActor, CaptureSnapshotReply.class);
85         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
86         leaderActor.tell(captureSnapshotReply, leaderActor);
87
88         // Wait for snapshot complete.
89         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
90
91         reinstateLeaderActor();
92
93         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
94         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
95         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
96         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
97         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
98         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
99
100         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
101                 leaderActor.underlyingActor().getState());
102     }
103
104     @Test
105     public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
106
107         send2InitialPayloads();
108
109         // Block these messages initially so we can control the sequence.
110         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
111         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
112
113         MockPayload payload2 = sendPayloadData(leaderActor, "two");
114
115         // This should trigger a snapshot.
116         MockPayload payload3 = sendPayloadData(leaderActor, "three");
117
118         // Send another payload.
119         MockPayload payload4 = sendPayloadData(leaderActor, "four");
120
121         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
122
123         CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
124                 leaderCollectorActor, CaptureSnapshot.class);
125
126         // First, deliver the AppendEntries to the follower
127         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
128
129         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
130
131         // Now deliver the CaptureSnapshot to the leader.
132         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
133         leaderActor.tell(captureSnapshot, leaderActor);
134
135         // Wait for snapshot complete.
136         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
137
138         reinstateLeaderActor();
139
140         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
141         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
142         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
143         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
144         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
145         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
146
147         // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
148         // were included in the snapshot. They were also included as unapplied entries in the snapshot as
149         // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
150         // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
151         // This is a side effect of trimming the persisted log to the sequence number captured at the time
152         // the snapshot was initiated.
153         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
154                 payload3, payload4), leaderActor.underlyingActor().getState());
155     }
156
157     @Test
158     public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
159
160         send2InitialPayloads();
161
162         // Block these messages initially so we can control the sequence.
163         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
164
165         MockPayload payload2 = sendPayloadData(leaderActor, "two");
166
167         // This should trigger a snapshot.
168         MockPayload payload3 = sendPayloadData(leaderActor, "three");
169
170         // Send another payload.
171         MockPayload payload4 = sendPayloadData(leaderActor, "four");
172
173         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
174
175         // Wait for snapshot complete.
176         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
177
178         // Now deliver the AppendEntries to the follower
179         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
180
181         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
182
183         reinstateLeaderActor();
184
185         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
186         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
187         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
188         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
189         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
190         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
191
192         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
193                 leaderActor.underlyingActor().getState());
194     }
195
196     private void reinstateLeaderActor() {
197         killActor(leaderActor);
198
199         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
200
201         leaderActor.underlyingActor().waitForRecoveryComplete();
202
203         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
204     }
205
206     private void send2InitialPayloads() {
207         waitUntilLeader(leaderActor);
208         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
209
210         payload0 = sendPayloadData(leaderActor, "zero");
211         payload1 = sendPayloadData(leaderActor, "one");
212
213         // Verify the leader applies the states.
214         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
215
216         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
217
218         MessageCollectorActor.clearMessages(leaderCollectorActor);
219         MessageCollectorActor.clearMessages(follower1CollectorActor);
220     }
221 }