Force install snapshot when follower log is ahead
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / NonVotingFollowerIntegrationTest.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Sets;
14 import java.util.Arrays;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
19 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
22 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24
25 /**
26  * Integration test for various scenarios involving non-voting followers.
27  *
28  * @author Thomas Pantelis
29  */
30 public class NonVotingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
31     private TestRaftActor followerInstance;
32     private TestRaftActor leaderInstance;
33
34     /**
35      * Tests non-voting follower re-sync after the non-persistent leader restarts with an empty log. In this
36      * case the follower's log will be ahead of the leader's log as the follower retains the previous
37      * data in memory. The leader must force an install snapshot to re-sync the follower's state.
38      */
39     @Test
40     public void testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart() {
41         testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart starting");
42
43         setupLeaderAndNonVotingFollower();
44
45         // Add log entries and verify they are committed and applied by both nodes.
46
47         expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
48         expSnapshotState.add(sendPayloadData(leaderActor, "one"));
49         expSnapshotState.add(sendPayloadData(leaderActor, "two"));
50
51         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
52         MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
53
54         assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
55         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
56         assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
57         assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
58         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
59
60         // Persisted journal should only contain the ServerConfigurationPayload and 2 UpdateElectionTerm entries.
61         assertEquals("Leader persisted journal size", 3, InMemoryJournal.get(leaderId).size());
62
63         // Restart the leader
64
65         killActor(leaderActor);
66         MessageCollectorActor.clearMessages(follower1CollectorActor);
67
68         createNewLeaderActor();
69
70         //follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
71
72         currentTerm++;
73         assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
74         assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
75         assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
76
77         // After restart, the leader's log and the follower's log will be ahead so the leader should force an
78         // install snapshot to re-sync the follower's log and state.
79
80         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
81
82         assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
83         assertEquals("Follower journal lastIndex", -1, follower1Context.getReplicatedLog().lastIndex());
84         assertEquals("Follower commit index", -1, follower1Context.getCommitIndex());
85
86         expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
87
88         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
89
90         assertEquals("Follower journal lastIndex", 0, follower1Context.getReplicatedLog().lastIndex());
91         assertEquals("Follower commit index", 0, follower1Context.getCommitIndex());
92         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
93
94         testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart ending");
95     }
96
97     /**
98      * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
99      * entries prior to re-connecting to the follower. The leader's last index will still be less than the
100      * follower's last index corresponding to the previous data retained in memory. So the follower's log
101      * will be ahead of the leader's log and the leader must force an install snapshot to re-sync the
102      * follower's state.
103      */
104     @Test
105     public void testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart() {
106         testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart starting");
107
108         setupLeaderAndNonVotingFollower();
109
110         // Add log entries and verify they are committed and applied by both nodes.
111
112         expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
113         expSnapshotState.add(sendPayloadData(leaderActor, "one"));
114         expSnapshotState.add(sendPayloadData(leaderActor, "two"));
115
116         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
117         MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
118
119         assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
120         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
121         assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
122         assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
123         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
124
125         // Restart the leader
126
127         killActor(leaderActor);
128         MessageCollectorActor.clearMessages(follower1CollectorActor);
129
130         // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
131         followerInstance.startDropMessages(AppendEntries.class);
132
133         createNewLeaderActor();
134
135         currentTerm++;
136         assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
137         assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
138         assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
139
140         // Add new log entries to the leader - one less than the prior log entries
141
142         expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
143         expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
144
145         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
146         assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
147         assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
148
149         // Re-enable AppendEntries to the follower. The leaders previous index will be present in the
150         // follower's but the terms won't match and the follower's log will be ahead of the leader's log
151         // The leader should force an install snapshot to re-sync the entire follower's log and state.
152
153         followerInstance.stopDropMessages(AppendEntries.class);
154         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
155
156         assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
157         assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
158         assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
159         assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
160         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
161
162         testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart ending");
163     }
164
165     /**
166      * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
167      * entries prior to re-connecting to the follower. The leader's last index will be 1 greater than the
168      * follower's last index corresponding to the previous data retained in memory. So the follower's log
169      * will be behind the leader's log but the leader's log entries will have a higher term. In this case the
170      * leader should force an install snapshot to re-sync the follower's state.
171      */
172     @Test
173     public void testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart() {
174         testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart starting");
175
176         setupLeaderAndNonVotingFollower();
177
178         // Add log entries and verify they are committed and applied by both nodes.
179
180         expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
181         expSnapshotState.add(sendPayloadData(leaderActor, "one"));
182
183         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
184         MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
185
186         assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
187         assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
188         assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
189         assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
190         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
191
192         // Restart the leader
193
194         killActor(leaderActor);
195         MessageCollectorActor.clearMessages(follower1CollectorActor);
196
197         // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
198         followerInstance.startDropMessages(AppendEntries.class);
199
200         createNewLeaderActor();
201
202         currentTerm++;
203         assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
204         assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
205         assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
206
207         // Add new log entries to the leader - one more than the prior log entries
208
209         expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
210         expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
211         expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
212
213         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
214         assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
215         assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
216         assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
217
218         // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
219         // should force the leader to install snapshot to re-sync the entire follower's log and state.
220
221         followerInstance.stopDropMessages(AppendEntries.class);
222         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
223
224         assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
225         assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
226         assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
227         assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
228         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
229
230         testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart ending");
231     }
232
233     /**
234      * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
235      * entries prior to re-connecting to the follower. The leader's last index will be greater than the
236      * follower's last index corresponding to the previous data retained in memory. So the follower's log
237      * will be behind the leader's log but the leader's log entries will have a higher term. It also adds a
238      * "down" peer on restart so the leader doesn't trim its log as it's trying to resync the follower.
239      * Eventually the follower should force the leader to install snapshot to re-sync its state.
240      */
241     @Test
242     public void testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart() {
243         testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart starting");
244
245         setupLeaderAndNonVotingFollower();
246
247         // Add log entries and verify they are committed and applied by both nodes.
248
249         expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
250         expSnapshotState.add(sendPayloadData(leaderActor, "one"));
251         expSnapshotState.add(sendPayloadData(leaderActor, "two"));
252
253         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
254         MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, expSnapshotState.size());
255
256         long lastIndex = 2;
257         assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
258         assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
259         assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
260         assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
261         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
262
263         MessageCollectorActor.clearMessages(follower1CollectorActor);
264         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
265         assertEquals("Follower snapshot index", lastIndex - 1, follower1Context.getReplicatedLog().getSnapshotIndex());
266         assertEquals("Follower journal size", 1, leaderContext.getReplicatedLog().size());
267
268         // Restart the leader
269
270         killActor(leaderActor);
271         MessageCollectorActor.clearMessages(follower1CollectorActor);
272
273         // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
274         followerInstance.startDropMessages(AppendEntries.class);
275
276         // Add a "down" peer so the leader doesn't trim its log as it's trying to resync the follower. The
277         // leader will keep decrementing the follower's nextIndex to try to find a matching index. Since
278         // there is no matching index it will eventually hit index 0 which should cause the follower to
279         // force an install snapshot upon failure to remove the conflicting indexes due to indexes 0 and 1
280         // being in the prior snapshot and not the log.
281         //
282         // We also add another voting follower actor into the mix even though it shoildn't affect the
283         // outcome.
284         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
285                 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
286                 new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
287         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, currentTerm,
288                 persistedServerConfig);
289
290         InMemoryJournal.clear();
291         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(currentTerm, leaderId));
292         InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
293         InMemoryJournal.addEntry(follower2Id, 1, persistedServerConfigEntry);
294
295         DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
296         follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
297         follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
298                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
299                     config(follower2ConfigParams).persistent(Optional.of(false)));
300         TestRaftActor follower2Instance = follower2Actor.underlyingActor();
301         follower2Instance.waitForRecoveryComplete();
302         follower2CollectorActor = follower2Instance.collectorActor();
303
304         peerAddresses = ImmutableMap.of(follower1Id, follower1Actor.path().toString(),
305                 follower2Id, follower2Actor.path().toString());
306
307         createNewLeaderActor();
308
309         currentTerm++;
310         assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
311         assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
312         assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
313
314         // Add new log entries to the leader - several more than the prior log entries
315
316         expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
317         expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
318         expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
319         expSnapshotState.add(sendPayloadData(leaderActor, "three-1"));
320         expSnapshotState.add(sendPayloadData(leaderActor, "four-1"));
321
322         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
323         MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, expSnapshotState.size());
324
325         lastIndex = 4;
326         assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
327         assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
328         assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
329         assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
330
331         // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
332         // should force the leader to install snapshot to re-sync the entire follower's log and state.
333
334         followerInstance.stopDropMessages(AppendEntries.class);
335         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
336
337         assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
338         assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
339         assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
340         assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
341         assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
342
343         testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart ending");
344     }
345
346     private void createNewLeaderActor() {
347         expSnapshotState.clear();
348         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
349                 config(leaderConfigParams).persistent(Optional.of(false)));
350         leaderInstance = leaderActor.underlyingActor();
351         leaderCollectorActor = leaderInstance.collectorActor();
352         waitUntilLeader(leaderActor);
353         leaderContext = leaderInstance.getRaftActorContext();
354     }
355
356     private void setupLeaderAndNonVotingFollower() {
357         snapshotBatchCount = 100;
358         int initialTerm = 1;
359
360         // Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
361
362         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
363                 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
364         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, initialTerm,
365                 persistedServerConfig);
366
367         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
368         InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
369         InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(initialTerm, leaderId));
370         InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
371
372         DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
373         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
374                 ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams).
375                     persistent(Optional.of(false)));
376
377         peerAddresses = ImmutableMap.<String, String>builder().
378                 put(follower1Id, follower1Actor.path().toString()).build();
379
380         leaderConfigParams = newLeaderConfigParams();
381         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
382                 config(leaderConfigParams).persistent(Optional.of(false)));
383
384         followerInstance = follower1Actor.underlyingActor();
385         follower1CollectorActor = followerInstance.collectorActor();
386
387         leaderInstance = leaderActor.underlyingActor();
388         leaderCollectorActor = leaderInstance.collectorActor();
389
390         leaderContext = leaderInstance.getRaftActorContext();
391         follower1Context = followerInstance.getRaftActorContext();
392
393         waitUntilLeader(leaderActor);
394
395         // Verify leader's context after startup
396
397         currentTerm = initialTerm + 1;
398         assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
399         assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
400                 Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
401         assertEquals("Leader isVotingMember", true, leaderContext.isVotingMember());
402
403         // Verify follower's context after startup
404
405         MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
406         assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
407         assertEquals("Follower server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
408                 Sets.newHashSet(follower1Context.getPeerServerInfo(true).getServerConfig()));
409         assertEquals("FollowerisVotingMember", false, follower1Context.isVotingMember());
410     }
411 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.