2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import com.google.common.base.Optional;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Sets;
16 import java.util.Arrays;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
20 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor.Builder;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
26 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
27 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
29 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
30 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
31 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
32 import scala.concurrent.duration.FiniteDuration;
35 * Integration test for various scenarios involving non-voting followers.
37 * @author Thomas Pantelis
39 public class NonVotingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
40 private TestRaftActor followerInstance;
41 private TestRaftActor leaderInstance;
42 private final Builder follower1Builder = TestRaftActor.newBuilder();
45 * Tests non-voting follower re-sync after the non-persistent leader restarts with an empty log. In this
46 * case the follower's log will be ahead of the leader's log as the follower retains the previous
47 * data in memory. The leader must force an install snapshot to re-sync the follower's state.
50 public void testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart() {
51 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart starting");
53 setupLeaderAndNonVotingFollower();
55 // Add log entries and verify they are committed and applied by both nodes.
57 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
58 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
59 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
61 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
62 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
64 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
65 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
66 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
67 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
68 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
70 // Persisted journal should only contain the ServerConfigurationPayload and 2 UpdateElectionTerm entries.
71 assertEquals("Leader persisted journal size", 3, InMemoryJournal.get(leaderId).size());
75 killActor(leaderActor);
76 MessageCollectorActor.clearMessages(follower1CollectorActor);
78 createNewLeaderActor();
80 //follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
83 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
84 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
85 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
87 // After restart, the leader's log and the follower's log will be ahead so the leader should force an
88 // install snapshot to re-sync the follower's log and state.
90 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
92 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
93 assertEquals("Follower journal lastIndex", -1, follower1Context.getReplicatedLog().lastIndex());
94 assertEquals("Follower commit index", -1, follower1Context.getCommitIndex());
96 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
98 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
100 assertEquals("Follower journal lastIndex", 0, follower1Context.getReplicatedLog().lastIndex());
101 assertEquals("Follower commit index", 0, follower1Context.getCommitIndex());
102 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
104 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart ending");
108 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
109 * entries prior to re-connecting to the follower. The leader's last index will still be less than the
110 * follower's last index corresponding to the previous data retained in memory. So the follower's log
111 * will be ahead of the leader's log and the leader must force an install snapshot to re-sync the
115 public void testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart() {
116 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart starting");
118 setupLeaderAndNonVotingFollower();
120 // Add log entries and verify they are committed and applied by both nodes.
122 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
123 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
124 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
126 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
127 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
129 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
130 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
131 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
132 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
133 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
135 // Restart the leader
137 killActor(leaderActor);
138 MessageCollectorActor.clearMessages(follower1CollectorActor);
140 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
141 followerInstance.startDropMessages(AppendEntries.class);
143 createNewLeaderActor();
146 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
147 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
148 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
150 // Add new log entries to the leader - one less than the prior log entries
152 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
153 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
155 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
156 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
157 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
159 // Re-enable AppendEntries to the follower. The leaders previous index will be present in the
160 // follower's but the terms won't match and the follower's log will be ahead of the leader's log
161 // The leader should force an install snapshot to re-sync the entire follower's log and state.
163 followerInstance.stopDropMessages(AppendEntries.class);
164 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
166 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
167 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
168 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
169 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
170 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
172 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart ending");
176 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
177 * entries prior to re-connecting to the follower. The leader's last index will be 1 greater than the
178 * follower's last index corresponding to the previous data retained in memory. So the follower's log
179 * will be behind the leader's log but the leader's log entries will have a higher term. In this case the
180 * leader should force an install snapshot to re-sync the follower's state.
183 public void testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart() {
184 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart starting");
186 setupLeaderAndNonVotingFollower();
188 // Add log entries and verify they are committed and applied by both nodes.
190 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
191 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
193 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
194 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
196 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
197 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
198 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
199 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
200 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
202 // Restart the leader
204 killActor(leaderActor);
205 MessageCollectorActor.clearMessages(follower1CollectorActor);
207 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
208 followerInstance.startDropMessages(AppendEntries.class);
210 createNewLeaderActor();
213 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
214 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
215 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
217 // Add new log entries to the leader - one more than the prior log entries
219 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
220 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
221 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
223 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
224 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
225 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
226 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
228 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
229 // should force the leader to install snapshot to re-sync the entire follower's log and state.
231 followerInstance.stopDropMessages(AppendEntries.class);
232 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
234 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
235 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
236 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
237 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
238 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
240 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart ending");
244 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
245 * entries prior to re-connecting to the follower. The leader's last index will be greater than the
246 * follower's last index corresponding to the previous data retained in memory. So the follower's log
247 * will be behind the leader's log but the leader's log entries will have a higher term. It also adds a
248 * "down" peer on restart so the leader doesn't trim its log as it's trying to resync the follower.
249 * Eventually the follower should force the leader to install snapshot to re-sync its state.
252 public void testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart() {
253 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart starting");
255 setupLeaderAndNonVotingFollower();
257 // Add log entries and verify they are committed and applied by both nodes.
259 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
260 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
261 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
263 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
264 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, expSnapshotState.size());
267 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
268 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
269 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
270 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
271 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
273 MessageCollectorActor.clearMessages(follower1CollectorActor);
274 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
275 assertEquals("Follower snapshot index", lastIndex - 1, follower1Context.getReplicatedLog().getSnapshotIndex());
276 assertEquals("Follower journal size", 1, leaderContext.getReplicatedLog().size());
278 // Restart the leader
280 killActor(leaderActor);
281 MessageCollectorActor.clearMessages(follower1CollectorActor);
283 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
284 followerInstance.startDropMessages(AppendEntries.class);
286 // Add a "down" peer so the leader doesn't trim its log as it's trying to resync the follower. The
287 // leader will keep decrementing the follower's nextIndex to try to find a matching index. Since
288 // there is no matching index it will eventually hit index 0 which should cause the follower to
289 // force an install snapshot upon failure to remove the conflicting indexes due to indexes 0 and 1
290 // being in the prior snapshot and not the log.
292 // We also add another voting follower actor into the mix even though it shoildn't affect the
294 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
295 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
296 new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
297 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, currentTerm,
298 persistedServerConfig);
300 InMemoryJournal.clear();
301 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(currentTerm, leaderId));
302 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
303 InMemoryJournal.addEntry(follower2Id, 1, persistedServerConfigEntry);
305 DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
306 follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
307 follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
308 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString()))
309 .config(follower2ConfigParams).persistent(Optional.of(false)));
310 TestRaftActor follower2Instance = follower2Actor.underlyingActor();
311 follower2Instance.waitForRecoveryComplete();
312 follower2CollectorActor = follower2Instance.collectorActor();
314 peerAddresses = ImmutableMap.of(follower1Id, follower1Actor.path().toString(),
315 follower2Id, follower2Actor.path().toString());
317 createNewLeaderActor();
320 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
321 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
322 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
324 // Add new log entries to the leader - several more than the prior log entries
326 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
327 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
328 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
329 expSnapshotState.add(sendPayloadData(leaderActor, "three-1"));
330 expSnapshotState.add(sendPayloadData(leaderActor, "four-1"));
332 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
333 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, expSnapshotState.size());
336 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
337 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
338 assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
339 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
341 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
342 // should force the leader to install snapshot to re-sync the entire follower's log and state.
344 followerInstance.stopDropMessages(AppendEntries.class);
345 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
347 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
348 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
349 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
350 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
351 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
353 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart ending");
357 public void testFollowerLeaderStateChanges() {
358 testLog.info("testFollowerLeaderStateChanges");
360 ActorRef roleChangeNotifier = factory.createActor(
361 MessageCollectorActor.props(), factory.generateActorId("roleChangeNotifier"));
362 follower1Builder.roleChangeNotifier(roleChangeNotifier);
364 setupLeaderAndNonVotingFollower();
366 ((DefaultConfigParamsImpl)follower1Context.getConfigParams()).setElectionTimeoutFactor(2);
367 ((DefaultConfigParamsImpl)follower1Context.getConfigParams())
368 .setHeartBeatInterval(FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
370 MessageCollectorActor.clearMessages(roleChangeNotifier);
371 follower1Actor.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
372 followerInstance.startDropMessages(AppendEntries.class);
374 LeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
375 LeaderStateChanged.class);
376 assertEquals("getLeaderId", null, leaderStateChanged.getLeaderId());
378 MessageCollectorActor.clearMessages(roleChangeNotifier);
379 followerInstance.stopDropMessages(AppendEntries.class);
381 leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
382 LeaderStateChanged.class);
383 assertEquals("getLeaderId", leaderId, leaderStateChanged.getLeaderId());
386 private void createNewLeaderActor() {
387 expSnapshotState.clear();
388 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
389 .config(leaderConfigParams).persistent(Optional.of(false)));
390 leaderInstance = leaderActor.underlyingActor();
391 leaderCollectorActor = leaderInstance.collectorActor();
392 waitUntilLeader(leaderActor);
393 leaderContext = leaderInstance.getRaftActorContext();
396 private void setupLeaderAndNonVotingFollower() {
397 snapshotBatchCount = 100;
398 int persistedTerm = 1;
400 // Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
402 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
403 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
404 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, persistedTerm,
405 persistedServerConfig);
407 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(persistedTerm, leaderId));
408 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
409 InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(persistedTerm, leaderId));
410 InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
412 DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
413 follower1Actor = newTestRaftActor(follower1Id, follower1Builder.peerAddresses(
414 ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams)
415 .persistent(Optional.of(false)));
417 peerAddresses = ImmutableMap.<String, String>builder()
418 .put(follower1Id, follower1Actor.path().toString()).build();
420 leaderConfigParams = newLeaderConfigParams();
421 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
422 .config(leaderConfigParams).persistent(Optional.of(false)));
424 followerInstance = follower1Actor.underlyingActor();
425 follower1CollectorActor = followerInstance.collectorActor();
427 leaderInstance = leaderActor.underlyingActor();
428 leaderCollectorActor = leaderInstance.collectorActor();
430 leaderContext = leaderInstance.getRaftActorContext();
431 follower1Context = followerInstance.getRaftActorContext();
433 waitUntilLeader(leaderActor);
435 // Verify leader's context after startup
437 currentTerm = persistedTerm + 1;
438 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
439 assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
440 Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
441 assertEquals("Leader isVotingMember", true, leaderContext.isVotingMember());
443 // Verify follower's context after startup
445 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
446 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
447 assertEquals("Follower server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
448 Sets.newHashSet(follower1Context.getPeerServerInfo(true).getServerConfig()));
449 assertEquals("FollowerisVotingMember", false, follower1Context.isVotingMember());