2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Sets;
14 import java.util.Arrays;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
19 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
22 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
26 * Integration test for various scenarios involving non-voting followers.
28 * @author Thomas Pantelis
30 public class NonVotingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
31 private TestRaftActor followerInstance;
32 private TestRaftActor leaderInstance;
35 * Tests non-voting follower re-sync after the non-persistent leader restarts with an empty log. In this
36 * case the follower's log will be ahead of the leader's log as the follower retains the previous
37 * data in memory. The leader must force an install snapshot to re-sync the follower's state.
40 public void testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart() {
41 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart starting");
43 setupLeaderAndNonVotingFollower();
45 // Add log entries and verify they are committed and applied by both nodes.
47 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
48 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
49 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
51 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
52 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
54 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
55 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
56 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
57 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
58 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
60 // Persisted journal should only contain the ServerConfigurationPayload and 2 UpdateElectionTerm entries.
61 assertEquals("Leader persisted journal size", 3, InMemoryJournal.get(leaderId).size());
65 killActor(leaderActor);
66 MessageCollectorActor.clearMessages(follower1CollectorActor);
68 createNewLeaderActor();
70 //follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
73 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
74 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
75 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
77 // After restart, the leader's log and the follower's log will be ahead so the leader should force an
78 // install snapshot to re-sync the follower's log and state.
80 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
82 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
83 assertEquals("Follower journal lastIndex", -1, follower1Context.getReplicatedLog().lastIndex());
84 assertEquals("Follower commit index", -1, follower1Context.getCommitIndex());
86 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
88 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
90 assertEquals("Follower journal lastIndex", 0, follower1Context.getReplicatedLog().lastIndex());
91 assertEquals("Follower commit index", 0, follower1Context.getCommitIndex());
92 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
94 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart ending");
98 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
99 * entries prior to re-connecting to the follower. The leader's last index will still be less than the
100 * follower's last index corresponding to the previous data retained in memory. So the follower's log
101 * will be ahead of the leader's log and the leader must force an install snapshot to re-sync the
105 public void testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart() {
106 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart starting");
108 setupLeaderAndNonVotingFollower();
110 // Add log entries and verify they are committed and applied by both nodes.
112 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
113 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
114 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
116 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
117 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
119 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
120 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
121 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
122 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
123 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
125 // Restart the leader
127 killActor(leaderActor);
128 MessageCollectorActor.clearMessages(follower1CollectorActor);
130 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
131 followerInstance.startDropMessages(AppendEntries.class);
133 createNewLeaderActor();
136 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
137 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
138 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
140 // Add new log entries to the leader - one less than the prior log entries
142 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
143 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
145 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
146 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
147 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
149 // Re-enable AppendEntries to the follower. The leaders previous index will be present in the
150 // follower's but the terms won't match and the follower's log will be ahead of the leader's log
151 // The leader should force an install snapshot to re-sync the entire follower's log and state.
153 followerInstance.stopDropMessages(AppendEntries.class);
154 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
156 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
157 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
158 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
159 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
160 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
162 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart ending");
166 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
167 * entries prior to re-connecting to the follower. The leader's last index will be 1 greater than the
168 * follower's last index corresponding to the previous data retained in memory. So the follower's log
169 * will be behind the leader's log but the leader's log entries will have a higher term. In this case the
170 * leader should force an install snapshot to re-sync the follower's state.
173 public void testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart() {
174 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart starting");
176 setupLeaderAndNonVotingFollower();
178 // Add log entries and verify they are committed and applied by both nodes.
180 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
181 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
183 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
184 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
186 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
187 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
188 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
189 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
190 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
192 // Restart the leader
194 killActor(leaderActor);
195 MessageCollectorActor.clearMessages(follower1CollectorActor);
197 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
198 followerInstance.startDropMessages(AppendEntries.class);
200 createNewLeaderActor();
203 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
204 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
205 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
207 // Add new log entries to the leader - one more than the prior log entries
209 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
210 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
211 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
213 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
214 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
215 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
216 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
218 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
219 // should force the leader to install snapshot to re-sync the entire follower's log and state.
221 followerInstance.stopDropMessages(AppendEntries.class);
222 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
224 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
225 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
226 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
227 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
228 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
230 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart ending");
234 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
235 * entries prior to re-connecting to the follower. The leader's last index will be greater than the
236 * follower's last index corresponding to the previous data retained in memory. So the follower's log
237 * will be behind the leader's log but the leader's log entries will have a higher term. It also adds a
238 * "down" peer on restart so the leader doesn't trim its log as it's trying to resync the follower.
239 * Eventually the follower should force the leader to install snapshot to re-sync its state.
242 public void testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart() {
243 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart starting");
245 setupLeaderAndNonVotingFollower();
247 // Add log entries and verify they are committed and applied by both nodes.
249 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
250 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
251 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
253 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
254 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, expSnapshotState.size());
257 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
258 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
259 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
260 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
261 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
263 MessageCollectorActor.clearMessages(follower1CollectorActor);
264 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
265 assertEquals("Follower snapshot index", lastIndex - 1, follower1Context.getReplicatedLog().getSnapshotIndex());
266 assertEquals("Follower journal size", 1, leaderContext.getReplicatedLog().size());
268 // Restart the leader
270 killActor(leaderActor);
271 MessageCollectorActor.clearMessages(follower1CollectorActor);
273 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
274 followerInstance.startDropMessages(AppendEntries.class);
276 // Add a "down" peer so the leader doesn't trim its log as it's trying to resync the follower. The
277 // leader will keep decrementing the follower's nextIndex to try to find a matching index. Since
278 // there is no matching index it will eventually hit index 0 which should cause the follower to
279 // force an install snapshot upon failure to remove the conflicting indexes due to indexes 0 and 1
280 // being in the prior snapshot and not the log.
282 // We also add another voting follower actor into the mix even though it shoildn't affect the
284 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
285 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
286 new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
287 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, currentTerm,
288 persistedServerConfig);
290 InMemoryJournal.clear();
291 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(currentTerm, leaderId));
292 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
293 InMemoryJournal.addEntry(follower2Id, 1, persistedServerConfigEntry);
295 DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
296 follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
297 follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
298 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
299 config(follower2ConfigParams).persistent(Optional.of(false)));
300 TestRaftActor follower2Instance = follower2Actor.underlyingActor();
301 follower2Instance.waitForRecoveryComplete();
302 follower2CollectorActor = follower2Instance.collectorActor();
304 peerAddresses = ImmutableMap.of(follower1Id, follower1Actor.path().toString(),
305 follower2Id, follower2Actor.path().toString());
307 createNewLeaderActor();
310 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
311 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
312 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
314 // Add new log entries to the leader - several more than the prior log entries
316 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
317 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
318 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
319 expSnapshotState.add(sendPayloadData(leaderActor, "three-1"));
320 expSnapshotState.add(sendPayloadData(leaderActor, "four-1"));
322 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
323 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, expSnapshotState.size());
326 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
327 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
328 assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
329 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
331 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
332 // should force the leader to install snapshot to re-sync the entire follower's log and state.
334 followerInstance.stopDropMessages(AppendEntries.class);
335 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
337 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
338 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
339 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
340 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
341 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
343 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart ending");
346 private void createNewLeaderActor() {
347 expSnapshotState.clear();
348 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
349 config(leaderConfigParams).persistent(Optional.of(false)));
350 leaderInstance = leaderActor.underlyingActor();
351 leaderCollectorActor = leaderInstance.collectorActor();
352 waitUntilLeader(leaderActor);
353 leaderContext = leaderInstance.getRaftActorContext();
356 private void setupLeaderAndNonVotingFollower() {
357 snapshotBatchCount = 100;
360 // Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
362 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
363 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
364 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, initialTerm,
365 persistedServerConfig);
367 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
368 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
369 InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(initialTerm, leaderId));
370 InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
372 DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
373 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
374 ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams).
375 persistent(Optional.of(false)));
377 peerAddresses = ImmutableMap.<String, String>builder().
378 put(follower1Id, follower1Actor.path().toString()).build();
380 leaderConfigParams = newLeaderConfigParams();
381 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
382 config(leaderConfigParams).persistent(Optional.of(false)));
384 followerInstance = follower1Actor.underlyingActor();
385 follower1CollectorActor = followerInstance.collectorActor();
387 leaderInstance = leaderActor.underlyingActor();
388 leaderCollectorActor = leaderInstance.collectorActor();
390 leaderContext = leaderInstance.getRaftActorContext();
391 follower1Context = followerInstance.getRaftActorContext();
393 waitUntilLeader(leaderActor);
395 // Verify leader's context after startup
397 currentTerm = initialTerm + 1;
398 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
399 assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
400 Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
401 assertEquals("Leader isVotingMember", true, leaderContext.isVotingMember());
403 // Verify follower's context after startup
405 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
406 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
407 assertEquals("Follower server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
408 Sets.newHashSet(follower1Context.getPeerServerInfo(true).getServerConfig()));
409 assertEquals("FollowerisVotingMember", false, follower1Context.isVotingMember());