2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import akka.actor.ActorRef;
12 import akka.dispatch.Dispatchers;
13 import com.google.common.base.Optional;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Sets;
16 import java.util.Arrays;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
20 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor.Builder;
21 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
24 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
25 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
28 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30 import scala.concurrent.duration.FiniteDuration;
33 * Integration test for various scenarios involving non-voting followers.
35 * @author Thomas Pantelis
37 public class NonVotingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
38 private TestRaftActor followerInstance;
39 private TestRaftActor leaderInstance;
40 private final Builder follower1Builder = TestRaftActor.newBuilder();
43 * Tests non-voting follower re-sync after the non-persistent leader restarts with an empty log. In this
44 * case the follower's log will be ahead of the leader's log as the follower retains the previous
45 * data in memory. The leader must force an install snapshot to re-sync the follower's state.
48 public void testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart() {
49 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart starting");
51 setupLeaderAndNonVotingFollower();
53 // Add log entries and verify they are committed and applied by both nodes.
55 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
56 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
57 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
59 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
60 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
62 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
63 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
64 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
65 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
66 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
68 // Persisted journal should only contain the ServerConfigurationPayload and 2 UpdateElectionTerm entries.
69 assertEquals("Leader persisted journal size", 3, InMemoryJournal.get(leaderId).size());
73 killActor(leaderActor);
74 MessageCollectorActor.clearMessages(follower1CollectorActor);
76 createNewLeaderActor();
78 //follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
81 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
82 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
83 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
85 // After restart, the leader's log and the follower's log will be ahead so the leader should force an
86 // install snapshot to re-sync the follower's log and state.
88 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
90 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
91 assertEquals("Follower journal lastIndex", -1, follower1Context.getReplicatedLog().lastIndex());
92 assertEquals("Follower commit index", -1, follower1Context.getCommitIndex());
94 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
96 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
98 assertEquals("Follower journal lastIndex", 0, follower1Context.getReplicatedLog().lastIndex());
99 assertEquals("Follower commit index", 0, follower1Context.getCommitIndex());
100 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
102 testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart ending");
106 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
107 * entries prior to re-connecting to the follower. The leader's last index will still be less than the
108 * follower's last index corresponding to the previous data retained in memory. So the follower's log
109 * will be ahead of the leader's log and the leader must force an install snapshot to re-sync the
113 public void testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart() {
114 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart starting");
116 setupLeaderAndNonVotingFollower();
118 // Add log entries and verify they are committed and applied by both nodes.
120 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
121 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
122 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
124 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
125 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
127 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
128 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
129 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
130 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
131 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
133 // Restart the leader
135 killActor(leaderActor);
136 MessageCollectorActor.clearMessages(follower1CollectorActor);
138 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
139 followerInstance.startDropMessages(AppendEntries.class);
141 createNewLeaderActor();
144 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
145 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
146 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
148 // Add new log entries to the leader - one less than the prior log entries
150 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
151 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
153 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
154 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
155 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
157 // Re-enable AppendEntries to the follower. The leaders previous index will be present in the
158 // follower's but the terms won't match and the follower's log will be ahead of the leader's log
159 // The leader should force an install snapshot to re-sync the entire follower's log and state.
161 followerInstance.stopDropMessages(AppendEntries.class);
162 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
164 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
165 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
166 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
167 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
168 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
170 testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart ending");
174 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
175 * entries prior to re-connecting to the follower. The leader's last index will be 1 greater than the
176 * follower's last index corresponding to the previous data retained in memory. So the follower's log
177 * will be behind the leader's log but the leader's log entries will have a higher term. In this case the
178 * leader should force an install snapshot to re-sync the follower's state.
181 public void testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart() {
182 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart starting");
184 setupLeaderAndNonVotingFollower();
186 // Add log entries and verify they are committed and applied by both nodes.
188 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
189 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
191 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
192 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
194 assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
195 assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
196 assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
197 assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
198 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
200 // Restart the leader
202 killActor(leaderActor);
203 MessageCollectorActor.clearMessages(follower1CollectorActor);
205 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
206 followerInstance.startDropMessages(AppendEntries.class);
208 createNewLeaderActor();
211 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
212 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
213 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
215 // Add new log entries to the leader - one more than the prior log entries
217 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
218 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
219 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
221 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
222 assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
223 assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
224 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
226 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
227 // should force the leader to install snapshot to re-sync the entire follower's log and state.
229 followerInstance.stopDropMessages(AppendEntries.class);
230 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
232 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
233 assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
234 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
235 assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
236 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
238 testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart ending");
242 * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
243 * entries prior to re-connecting to the follower. The leader's last index will be greater than the
244 * follower's last index corresponding to the previous data retained in memory. So the follower's log
245 * will be behind the leader's log but the leader's log entries will have a higher term. It also adds a
246 * "down" peer on restart so the leader doesn't trim its log as it's trying to resync the follower.
247 * Eventually the follower should force the leader to install snapshot to re-sync its state.
250 public void testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart() {
251 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart starting");
253 setupLeaderAndNonVotingFollower();
255 // Add log entries and verify they are committed and applied by both nodes.
257 expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
258 expSnapshotState.add(sendPayloadData(leaderActor, "one"));
259 expSnapshotState.add(sendPayloadData(leaderActor, "two"));
261 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
262 MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, expSnapshotState.size());
265 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
266 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
267 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
268 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
269 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
271 MessageCollectorActor.clearMessages(follower1CollectorActor);
272 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
273 assertEquals("Follower snapshot index", lastIndex - 1, follower1Context.getReplicatedLog().getSnapshotIndex());
274 assertEquals("Follower journal size", 1, leaderContext.getReplicatedLog().size());
276 // Restart the leader
278 killActor(leaderActor);
279 MessageCollectorActor.clearMessages(follower1CollectorActor);
281 // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
282 followerInstance.startDropMessages(AppendEntries.class);
284 // Add a "down" peer so the leader doesn't trim its log as it's trying to resync the follower. The
285 // leader will keep decrementing the follower's nextIndex to try to find a matching index. Since
286 // there is no matching index it will eventually hit index 0 which should cause the follower to
287 // force an install snapshot upon failure to remove the conflicting indexes due to indexes 0 and 1
288 // being in the prior snapshot and not the log.
290 // We also add another voting follower actor into the mix even though it shoildn't affect the
292 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
293 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
294 new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
295 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, currentTerm,
296 persistedServerConfig);
298 InMemoryJournal.clear();
299 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(currentTerm, leaderId));
300 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
301 InMemoryJournal.addEntry(follower2Id, 1, persistedServerConfigEntry);
303 DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
304 follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
305 follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
306 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
307 config(follower2ConfigParams).persistent(Optional.of(false)));
308 TestRaftActor follower2Instance = follower2Actor.underlyingActor();
309 follower2Instance.waitForRecoveryComplete();
310 follower2CollectorActor = follower2Instance.collectorActor();
312 peerAddresses = ImmutableMap.of(follower1Id, follower1Actor.path().toString(),
313 follower2Id, follower2Actor.path().toString());
315 createNewLeaderActor();
318 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
319 assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
320 assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
322 // Add new log entries to the leader - several more than the prior log entries
324 expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
325 expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
326 expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
327 expSnapshotState.add(sendPayloadData(leaderActor, "three-1"));
328 expSnapshotState.add(sendPayloadData(leaderActor, "four-1"));
330 MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
331 MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, expSnapshotState.size());
334 assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
335 assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
336 assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
337 assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
339 // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
340 // should force the leader to install snapshot to re-sync the entire follower's log and state.
342 followerInstance.stopDropMessages(AppendEntries.class);
343 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
345 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
346 assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
347 assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
348 assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
349 assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
351 testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart ending");
355 public void testFollowerLeaderStateChanges() {
356 testLog.info("testFollowerLeaderStateChanges");
358 ActorRef roleChangeNotifier = factory.<MessageCollectorActor>createTestActor(
359 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
360 factory.generateActorId("roleChangeNotifier"));
361 follower1Builder.roleChangeNotifier(roleChangeNotifier);
363 setupLeaderAndNonVotingFollower();
365 ((DefaultConfigParamsImpl)follower1Context.getConfigParams()).setElectionTimeoutFactor(2);
366 ((DefaultConfigParamsImpl)follower1Context.getConfigParams()).
367 setHeartBeatInterval(FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
369 MessageCollectorActor.clearMessages(roleChangeNotifier);
370 follower1Actor.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
371 followerInstance.startDropMessages(AppendEntries.class);
373 LeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
374 LeaderStateChanged.class);
375 assertEquals("getLeaderId", null, leaderStateChanged.getLeaderId());
377 MessageCollectorActor.clearMessages(roleChangeNotifier);
378 followerInstance.stopDropMessages(AppendEntries.class);
380 leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
381 LeaderStateChanged.class);
382 assertEquals("getLeaderId", leaderId, leaderStateChanged.getLeaderId());
385 private void createNewLeaderActor() {
386 expSnapshotState.clear();
387 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
388 config(leaderConfigParams).persistent(Optional.of(false)));
389 leaderInstance = leaderActor.underlyingActor();
390 leaderCollectorActor = leaderInstance.collectorActor();
391 waitUntilLeader(leaderActor);
392 leaderContext = leaderInstance.getRaftActorContext();
395 private void setupLeaderAndNonVotingFollower() {
396 snapshotBatchCount = 100;
399 // Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
401 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
402 new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
403 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, initialTerm,
404 persistedServerConfig);
406 InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
407 InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
408 InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(initialTerm, leaderId));
409 InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
411 DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
412 follower1Actor = newTestRaftActor(follower1Id, follower1Builder.peerAddresses(
413 ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams).
414 persistent(Optional.of(false)));
416 peerAddresses = ImmutableMap.<String, String>builder().
417 put(follower1Id, follower1Actor.path().toString()).build();
419 leaderConfigParams = newLeaderConfigParams();
420 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
421 config(leaderConfigParams).persistent(Optional.of(false)));
423 followerInstance = follower1Actor.underlyingActor();
424 follower1CollectorActor = followerInstance.collectorActor();
426 leaderInstance = leaderActor.underlyingActor();
427 leaderCollectorActor = leaderInstance.collectorActor();
429 leaderContext = leaderInstance.getRaftActorContext();
430 follower1Context = followerInstance.getRaftActorContext();
432 waitUntilLeader(leaderActor);
434 // Verify leader's context after startup
436 currentTerm = initialTerm + 1;
437 assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
438 assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
439 Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
440 assertEquals("Leader isVotingMember", true, leaderContext.isVotingMember());
442 // Verify follower's context after startup
444 MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
445 assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
446 assertEquals("Follower server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
447 Sets.newHashSet(follower1Context.getPeerServerInfo(true).getServerConfig()));
448 assertEquals("FollowerisVotingMember", false, follower1Context.isVotingMember());