2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
18 import akka.actor.ActorRef;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Lists;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.notifications.RoleChanged;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
29 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
30 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import scala.concurrent.duration.FiniteDuration;
36 * Tests isolation of nodes end-to-end.
38 * @author Thomas Pantelis
40 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
41 private ActorRef follower1NotifierActor;
42 private ActorRef leaderNotifierActor;
45 * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
46 * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
47 * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
51 public void testLeaderIsolationWithAllPriorEntriesCommitted() {
52 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
56 // Send an initial payloads and verify replication.
58 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
59 final MockPayload payload1 = sendPayloadData(leaderActor, "one");
60 verifyApplyJournalEntries(leaderCollectorActor, 1);
61 verifyApplyJournalEntries(follower1CollectorActor, 1);
62 verifyApplyJournalEntries(follower2CollectorActor, 1);
66 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
68 testLog.info("Sending payload to isolated leader");
70 final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
72 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
73 // is collected but not forwarded to the follower RaftActor.
75 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
76 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
77 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
78 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
79 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
81 // The leader should transition to IsolatedLeader.
83 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
84 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
86 forceElectionOnFollower1();
88 // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
91 testLog.info("Sending payload to new leader");
93 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
94 verifyApplyJournalEntries(follower1CollectorActor, 2);
95 verifyApplyJournalEntries(follower2CollectorActor, 2);
97 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
98 assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
99 assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
100 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
102 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
103 follower1Actor.underlyingActor().getState());
107 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
108 // with a higher term.
110 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
111 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
113 // The previous leader has a conflicting log entry at index 2 with a different term which should get
114 // replaced by the new leader's index 1 entry.
116 verifyApplyJournalEntries(leaderCollectorActor, 2);
118 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
119 assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
120 assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
121 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
123 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
124 leaderActor.underlyingActor().getState());
126 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
130 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
131 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
132 * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
133 * sides should reconcile their logs appropriately.
136 public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() {
137 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
141 // Submit an initial payload that is committed/applied on all nodes.
143 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
144 verifyApplyJournalEntries(leaderCollectorActor, 0);
145 verifyApplyJournalEntries(follower1CollectorActor, 0);
146 verifyApplyJournalEntries(follower2CollectorActor, 0);
148 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
149 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
150 // with the updated leader commit index.
152 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
153 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
155 MockPayload payload1 = sendPayloadData(leaderActor, "one");
157 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
158 // message is forwarded to the followers.
160 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
161 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
162 && ae.getEntries().get(0).getData().equals(payload1));
164 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
165 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
166 && ae.getEntries().get(0).getData().equals(payload1));
168 verifyApplyJournalEntries(leaderCollectorActor, 1);
172 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
174 testLog.info("Sending payload to isolated leader");
176 final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
178 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
179 // is collected but not forwarded to the follower RaftActor.
181 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
182 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
183 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
184 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
185 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
187 // The leader should transition to IsolatedLeader.
189 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
190 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
192 forceElectionOnFollower1();
194 // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
195 // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
196 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
198 testLog.info("Sending payload to new leader");
200 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
201 verifyApplyJournalEntries(follower1CollectorActor, 3);
202 verifyApplyJournalEntries(follower2CollectorActor, 3);
204 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
205 assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
206 assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
207 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
209 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
210 follower1Actor.underlyingActor().getState());
214 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
215 // with a higher term.
217 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
218 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
220 // The previous leader has a conflicting log entry at index 2 with a different term which should get
221 // replaced by the new leader's entry.
223 verifyApplyJournalEntries(leaderCollectorActor, 3);
225 verifyRaftState(leaderActor, raftState -> {
226 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
227 assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
228 assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
231 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
232 leaderActor.underlyingActor().getState());
234 // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
236 List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
237 for (ApplyState as: applyState) {
238 if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
239 fail("Got unexpected ApplyState: " + as);
243 // The prior leader should not have needed a snapshot installed in order to get it synced.
245 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
247 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
251 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
252 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
253 * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
254 * and both sides should reconcile their logs appropriately.
257 public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() {
258 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
262 // Submit an initial payload that is committed/applied on all nodes.
264 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
265 verifyApplyJournalEntries(leaderCollectorActor, 0);
266 verifyApplyJournalEntries(follower1CollectorActor, 0);
267 verifyApplyJournalEntries(follower2CollectorActor, 0);
269 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
270 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
271 // with the updated leader commit index.
273 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
274 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
276 MockPayload payload1 = sendPayloadData(leaderActor, "one");
278 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
279 // message is forwarded to the followers.
281 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
282 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
283 && ae.getEntries().get(0).getData().equals(payload1));
285 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
286 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
287 && ae.getEntries().get(0).getData().equals(payload1));
289 verifyApplyJournalEntries(leaderCollectorActor, 1);
293 // Send 3 payloads to the isolated leader so it has uncommitted log entries.
295 testLog.info("Sending 3 payloads to isolated leader");
297 sendPayloadData(leaderActor, "two");
298 sendPayloadData(leaderActor, "three");
299 sendPayloadData(leaderActor, "four");
301 // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
302 // are collected but not forwarded to the follower RaftActor.
304 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
305 for (ReplicatedLogEntry e: ae.getEntries()) {
306 if (e.getIndex() == 4) {
313 // The leader should transition to IsolatedLeader.
315 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
316 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
318 forceElectionOnFollower1();
320 // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
321 // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
322 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
324 testLog.info("Sending 3 payloads to new leader");
326 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
327 final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
328 final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
329 verifyApplyJournalEntries(follower1CollectorActor, 5);
330 verifyApplyJournalEntries(follower2CollectorActor, 5);
332 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
333 assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
334 assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
335 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
337 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
338 newLeaderPayload4), follower1Actor.underlyingActor().getState());
342 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
343 // with a higher term.
345 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
346 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
348 // The previous leader has conflicting log entries starting at index 2 with different terms which should get
349 // replaced by the new leader's entries.
351 verifyApplyJournalEntries(leaderCollectorActor, 5);
353 verifyRaftState(leaderActor, raftState -> {
354 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
355 assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
356 assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
359 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
360 newLeaderPayload4), leaderActor.underlyingActor().getState());
362 // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
364 List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
365 for (ApplyState as: applyState) {
366 if (as.getReplicatedLogEntry().getTerm() == 1) {
367 fail("Got unexpected ApplyState: " + as);
371 // The prior leader should not have needed a snapshot installed in order to get it synced.
373 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
375 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
378 private void removeIsolation() {
379 testLog.info("Removing isolation");
381 clearMessages(leaderNotifierActor);
382 clearMessages(leaderCollectorActor);
384 leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
385 leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
386 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
387 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
390 private void forceElectionOnFollower1() {
391 // Force follower1 to start an election. follower2 should grant the vote.
393 testLog.info("Forcing election on {}", follower1Id);
395 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
397 expectFirstMatching(follower1NotifierActor, RoleChanged.class,
398 rc -> rc.getNewRole().equals(RaftState.Leader.name()));
400 currentTerm = follower1Context.getTermInformation().getCurrentTerm();
403 private void isolateLeader() {
404 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
406 testLog.info("Isolating the leader");
408 leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
409 leaderActor.underlyingActor().startDropMessages(RequestVote.class);
411 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
412 ae -> ae.getLeaderId().equals(leaderId));
413 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
414 ae -> ae.getLeaderId().equals(leaderId));
416 clearMessages(follower1CollectorActor);
417 clearMessages(follower1NotifierActor);
418 clearMessages(leaderNotifierActor);
421 private void createRaftActors() {
422 testLog.info("createRaftActors starting");
424 follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
425 factory.generateActorId(follower1Id + "-notifier"));
427 DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
428 followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
429 followerConfigParams.setElectionTimeoutFactor(1000);
430 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
431 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
432 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
434 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
435 follower1Id, testActorPath(follower1Id)), followerConfigParams);
437 peerAddresses = ImmutableMap.<String, String>builder()
438 .put(follower1Id, follower1Actor.path().toString())
439 .put(follower2Id, follower2Actor.path().toString()).build();
441 leaderConfigParams = newLeaderConfigParams();
442 leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
444 leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
445 factory.generateActorId(leaderId + "-notifier"));
447 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
448 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
450 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
451 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
452 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
454 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
455 waitUntilLeader(leaderActor);
457 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
460 clearMessages(leaderCollectorActor);
461 clearMessages(follower1CollectorActor);
462 clearMessages(follower2CollectorActor);
464 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
465 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
467 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
468 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
470 testLog.info("createRaftActors ending");