2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
18 import akka.actor.Actor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.testkit.TestActorRef;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Lists;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.RoleChanged;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
35 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
36 import scala.concurrent.duration.FiniteDuration;
39 * Tests isolation of nodes end-to-end.
41 * @author Thomas Pantelis
43 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
44 private TestActorRef<Actor> follower1NotifierActor;
45 private TestActorRef<Actor> leaderNotifierActor;
48 * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
49 * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
50 * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
54 public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
55 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
59 // Send an initial payloads and verify replication.
61 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
62 final MockPayload payload1 = sendPayloadData(leaderActor, "one");
63 verifyApplyJournalEntries(leaderCollectorActor, 1);
64 verifyApplyJournalEntries(follower1CollectorActor, 1);
65 verifyApplyJournalEntries(follower2CollectorActor, 1);
69 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
71 testLog.info("Sending payload to isolated leader");
73 final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
75 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
76 // is collected but not forwarded to the follower RaftActor.
78 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
79 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
80 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
81 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
82 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
84 // The leader should transition to IsolatedLeader.
86 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
87 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
89 forceElectionOnFollower1();
91 // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
94 testLog.info("Sending payload to new leader");
96 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
97 verifyApplyJournalEntries(follower1CollectorActor, 2);
98 verifyApplyJournalEntries(follower2CollectorActor, 2);
100 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
101 assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
102 assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
103 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
105 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
106 follower1Actor.underlyingActor().getState());
110 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
111 // with a higher term.
113 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
114 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
116 // The previous leader has a conflicting log entry at index 2 with a different term which should get
117 // replaced by the new leader's index 1 entry.
119 verifyApplyJournalEntries(leaderCollectorActor, 2);
121 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
122 assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
123 assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
124 verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
126 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
127 leaderActor.underlyingActor().getState());
129 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
133 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
134 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
135 * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
136 * sides should reconcile their logs appropriately.
139 public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
140 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
144 // Submit an initial payload that is committed/applied on all nodes.
146 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
147 verifyApplyJournalEntries(leaderCollectorActor, 0);
148 verifyApplyJournalEntries(follower1CollectorActor, 0);
149 verifyApplyJournalEntries(follower2CollectorActor, 0);
151 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
152 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
153 // with the updated leader commit index.
155 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
156 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
158 MockPayload payload1 = sendPayloadData(leaderActor, "one");
160 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
161 // message is forwarded to the followers.
163 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
164 return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
165 && ae.getEntries().get(0).getData().equals(payload1);
168 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
169 return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
170 && ae.getEntries().get(0).getData().equals(payload1);
173 verifyApplyJournalEntries(leaderCollectorActor, 1);
177 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
179 testLog.info("Sending payload to isolated leader");
181 final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
183 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
184 // is collected but not forwarded to the follower RaftActor.
186 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
187 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
188 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
189 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
190 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
192 // The leader should transition to IsolatedLeader.
194 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
195 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
197 forceElectionOnFollower1();
199 // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
200 // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
201 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
203 testLog.info("Sending payload to new leader");
205 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
206 verifyApplyJournalEntries(follower1CollectorActor, 3);
207 verifyApplyJournalEntries(follower2CollectorActor, 3);
209 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
210 assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
211 assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
212 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
214 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
215 follower1Actor.underlyingActor().getState());
219 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
220 // with a higher term.
222 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
223 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
225 // The previous leader has a conflicting log entry at index 2 with a different term which should get
226 // replaced by the new leader's entry.
228 verifyApplyJournalEntries(leaderCollectorActor, 3);
230 verifyRaftState(leaderActor, raftState -> {
231 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
232 assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
233 assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
236 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
237 leaderActor.underlyingActor().getState());
239 // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
241 List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
242 for (ApplyState as: applyState) {
243 if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
244 fail("Got unexpected ApplyState: " + as);
248 // The prior leader should not have needed a snapshot installed in order to get it synced.
250 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
252 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
256 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
257 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
258 * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
259 * and both sides should reconcile their logs appropriately.
262 public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
263 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
267 // Submit an initial payload that is committed/applied on all nodes.
269 final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
270 verifyApplyJournalEntries(leaderCollectorActor, 0);
271 verifyApplyJournalEntries(follower1CollectorActor, 0);
272 verifyApplyJournalEntries(follower2CollectorActor, 0);
274 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
275 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
276 // with the updated leader commit index.
278 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
279 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
281 MockPayload payload1 = sendPayloadData(leaderActor, "one");
283 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
284 // message is forwarded to the followers.
286 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
287 return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
288 && ae.getEntries().get(0).getData().equals(payload1);
291 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
292 return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
293 && ae.getEntries().get(0).getData().equals(payload1);
296 verifyApplyJournalEntries(leaderCollectorActor, 1);
300 // Send 3 payloads to the isolated leader so it has uncommitted log entries.
302 testLog.info("Sending 3 payloads to isolated leader");
304 sendPayloadData(leaderActor, "two");
305 sendPayloadData(leaderActor, "three");
306 sendPayloadData(leaderActor, "four");
308 // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
309 // are collected but not forwarded to the follower RaftActor.
311 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
312 for (ReplicatedLogEntry e: ae.getEntries()) {
313 if (e.getIndex() == 4) {
320 // The leader should transition to IsolatedLeader.
322 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
323 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
325 forceElectionOnFollower1();
327 // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
328 // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
329 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
331 testLog.info("Sending 3 payloads to new leader");
333 final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
334 final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
335 final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
336 verifyApplyJournalEntries(follower1CollectorActor, 5);
337 verifyApplyJournalEntries(follower2CollectorActor, 5);
339 assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
340 assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
341 assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
342 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
344 assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
345 newLeaderPayload4), follower1Actor.underlyingActor().getState());
349 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
350 // with a higher term.
352 expectFirstMatching(leaderNotifierActor, RoleChanged.class,
353 rc -> rc.getNewRole().equals(RaftState.Follower.name()));
355 // The previous leader has conflicting log entries starting at index 2 with different terms which should get
356 // replaced by the new leader's entries.
358 verifyApplyJournalEntries(leaderCollectorActor, 5);
360 verifyRaftState(leaderActor, raftState -> {
361 assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
362 assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
363 assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
366 assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
367 newLeaderPayload4), leaderActor.underlyingActor().getState());
369 // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
371 List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
372 for (ApplyState as: applyState) {
373 if (as.getReplicatedLogEntry().getTerm() == 1) {
374 fail("Got unexpected ApplyState: " + as);
378 // The prior leader should not have needed a snapshot installed in order to get it synced.
380 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
382 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
385 private void removeIsolation() {
386 testLog.info("Removing isolation");
388 clearMessages(leaderNotifierActor);
389 clearMessages(leaderCollectorActor);
391 leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
392 leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
393 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
394 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
397 private void forceElectionOnFollower1() {
398 // Force follower1 to start an election. follower2 should grant the vote.
400 testLog.info("Forcing election on {}", follower1Id);
402 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
404 expectFirstMatching(follower1NotifierActor, RoleChanged.class,
405 rc -> rc.getNewRole().equals(RaftState.Leader.name()));
407 currentTerm = follower1Context.getTermInformation().getCurrentTerm();
410 private void isolateLeader() {
411 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
413 testLog.info("Isolating the leader");
415 leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
416 leaderActor.underlyingActor().startDropMessages(RequestVote.class);
418 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
419 ae -> ae.getLeaderId().equals(leaderId));
420 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
421 ae -> ae.getLeaderId().equals(leaderId));
423 clearMessages(follower1CollectorActor);
424 clearMessages(follower1NotifierActor);
425 clearMessages(leaderNotifierActor);
428 private void createRaftActors() {
429 testLog.info("createRaftActors starting");
431 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
432 factory.generateActorId(follower1Id + "-notifier"));
434 DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
435 followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
436 followerConfigParams.setElectionTimeoutFactor(1000);
437 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
438 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
439 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
441 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
442 follower1Id, testActorPath(follower1Id)), followerConfigParams);
444 peerAddresses = ImmutableMap.<String, String>builder()
445 .put(follower1Id, follower1Actor.path().toString())
446 .put(follower2Id, follower2Actor.path().toString()).build();
448 leaderConfigParams = newLeaderConfigParams();
449 leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
451 leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
452 factory.generateActorId(leaderId + "-notifier"));
454 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
455 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
457 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
458 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
459 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
461 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
462 waitUntilLeader(leaderActor);
464 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
467 clearMessages(leaderCollectorActor);
468 clearMessages(follower1CollectorActor);
469 clearMessages(follower2CollectorActor);
471 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
472 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
474 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
475 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
477 testLog.info("createRaftActors ending");