2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.getAllMatching;
18 import com.google.common.collect.ImmutableMap;
19 import java.time.Duration;
20 import java.util.List;
21 import org.apache.pekko.actor.ActorRef;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.notifications.RoleChanged;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.raft.api.RaftRole;
33 * Tests isolation of nodes end-to-end.
35 * @author Thomas Pantelis
37 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
38 private ActorRef follower1NotifierActor;
39 private ActorRef leaderNotifierActor;
42 * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
43 * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
44 * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
48 public void testLeaderIsolationWithAllPriorEntriesCommitted() {
49 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
53 // Send an initial payloads and verify replication.
55 final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
56 final MockCommand payload1 = sendPayloadData(leaderActor, "one");
57 verifyApplyJournalEntries(leaderCollectorActor, 1);
58 verifyApplyJournalEntries(follower1CollectorActor, 1);
59 verifyApplyJournalEntries(follower2CollectorActor, 1);
63 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
65 testLog.info("Sending payload to isolated leader");
67 final MockCommand isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
69 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
70 // is collected but not forwarded to the follower RaftActor.
72 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
73 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
74 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
75 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
76 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
78 // The leader should transition to IsolatedLeader.
80 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
82 forceElectionOnFollower1();
84 // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
87 testLog.info("Sending payload to new leader");
89 final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
90 verifyApplyJournalEntries(follower1CollectorActor, 2);
91 verifyApplyJournalEntries(follower2CollectorActor, 2);
93 final var follower1log = follower1Context.getReplicatedLog();
94 assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
95 assertEquals("Follower 1 journal last index", 2, follower1log.lastIndex());
96 assertEquals("Follower 1 commit index", 2, follower1log.getCommitIndex());
97 verifyReplicatedLogEntry(follower1log.get(2), currentTerm, 2, newLeaderPayload2);
99 assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
100 follower1Actor.underlyingActor().getState());
104 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
105 // with a higher term.
107 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
109 // The previous leader has a conflicting log entry at index 2 with a different term which should get
110 // replaced by the new leader's index 1 entry.
112 verifyApplyJournalEntries(leaderCollectorActor, 2);
114 final var leaderLog = leaderContext.getReplicatedLog();
115 assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
116 assertEquals("Prior leader journal last index", 2, leaderLog.lastIndex());
117 assertEquals("Prior leader commit index", 2, leaderLog.getCommitIndex());
118 verifyReplicatedLogEntry(leaderLog.get(2), currentTerm, 2, newLeaderPayload2);
120 assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
121 leaderActor.underlyingActor().getState());
123 testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
127 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
128 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
129 * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
130 * sides should reconcile their logs appropriately.
133 public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() {
134 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
138 // Submit an initial payload that is committed/applied on all nodes.
140 final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
141 verifyApplyJournalEntries(leaderCollectorActor, 0);
142 verifyApplyJournalEntries(follower1CollectorActor, 0);
143 verifyApplyJournalEntries(follower2CollectorActor, 0);
145 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
146 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
147 // with the updated leader commit index.
149 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
150 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
152 MockCommand payload1 = sendPayloadData(leaderActor, "one");
154 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
155 // message is forwarded to the followers.
157 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
158 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
159 && ae.getEntries().getFirst().command().equals(payload1));
161 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
162 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
163 && ae.getEntries().getFirst().command().equals(payload1));
165 verifyApplyJournalEntries(leaderCollectorActor, 1);
169 // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
171 testLog.info("Sending payload to isolated leader");
173 final MockCommand isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
175 // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
176 // is collected but not forwarded to the follower RaftActor.
178 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
179 assertEquals("getTerm", currentTerm, appendEntries.getTerm());
180 assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
181 assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
182 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
184 // The leader should transition to IsolatedLeader.
186 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
188 forceElectionOnFollower1();
190 // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
191 // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
192 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
194 testLog.info("Sending payload to new leader");
196 final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
197 verifyApplyJournalEntries(follower1CollectorActor, 3);
198 verifyApplyJournalEntries(follower2CollectorActor, 3);
200 final var follower1log = follower1Context.getReplicatedLog();
201 assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
202 assertEquals("Follower 1 journal last index", 3, follower1log.lastIndex());
203 assertEquals("Follower 1 commit index", 3, follower1log.getCommitIndex());
204 verifyReplicatedLogEntry(follower1log.get(3), currentTerm, 3, newLeaderPayload2);
206 assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
207 follower1Actor.underlyingActor().getState());
211 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
212 // with a higher term.
214 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
216 // The previous leader has a conflicting log entry at index 2 with a different term which should get
217 // replaced by the new leader's entry.
219 verifyApplyJournalEntries(leaderCollectorActor, 3);
221 verifyRaftState(leaderActor, raftState -> {
222 final var leaderLog = leaderContext.getReplicatedLog();
223 assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
224 assertEquals("Prior leader journal last index", 3, leaderLog.lastIndex());
225 assertEquals("Prior leader commit index", 3, leaderLog.getCommitIndex());
228 assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
229 leaderActor.underlyingActor().getState());
231 // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
233 final var applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
234 for (var as : applyState) {
235 if (as.entry().index() == 2 && as.entry().term() == 1) {
236 fail("Got unexpected ApplyState: " + as);
240 // The prior leader should not have needed a snapshot installed in order to get it synced.
242 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
244 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
248 * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
249 * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
250 * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
251 * and both sides should reconcile their logs appropriately.
254 public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() {
255 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
259 // Submit an initial payload that is committed/applied on all nodes.
261 final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
262 verifyApplyJournalEntries(leaderCollectorActor, 0);
263 verifyApplyJournalEntries(follower1CollectorActor, 0);
264 verifyApplyJournalEntries(follower2CollectorActor, 0);
266 // Submit another payload that is replicated to all followers and committed on the leader but the leader is
267 // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
268 // with the updated leader commit index.
270 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
271 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
273 MockCommand payload1 = sendPayloadData(leaderActor, "one");
275 // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
276 // message is forwarded to the followers.
278 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
279 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
280 && ae.getEntries().getFirst().command().equals(payload1));
282 expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
283 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
284 && ae.getEntries().getFirst().command().equals(payload1));
286 verifyApplyJournalEntries(leaderCollectorActor, 1);
290 // Send 3 payloads to the isolated leader so it has uncommitted log entries.
292 testLog.info("Sending 3 payloads to isolated leader");
294 sendPayloadData(leaderActor, "two");
295 sendPayloadData(leaderActor, "three");
296 sendPayloadData(leaderActor, "four");
298 // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
299 // are collected but not forwarded to the follower RaftActor.
301 expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
302 for (var entry : ae.getEntries()) {
303 if (entry.index() == 4) {
310 // The leader should transition to IsolatedLeader.
312 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
314 forceElectionOnFollower1();
316 // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
317 // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
318 // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
320 testLog.info("Sending 3 payloads to new leader");
322 final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
323 final var newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
324 final var newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
325 verifyApplyJournalEntries(follower1CollectorActor, 5);
326 verifyApplyJournalEntries(follower2CollectorActor, 5);
328 final var follower1log = follower1Context.getReplicatedLog();
329 assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
330 assertEquals("Follower 1 journal last index", 5, follower1log.lastIndex());
331 assertEquals("Follower 1 commit index", 5, follower1log.getCommitIndex());
332 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
334 assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
335 newLeaderPayload4), follower1Actor.underlyingActor().getState());
339 // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
340 // with a higher term.
342 expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
344 // The previous leader has conflicting log entries starting at index 2 with different terms which should get
345 // replaced by the new leader's entries.
347 verifyApplyJournalEntries(leaderCollectorActor, 5);
349 verifyRaftState(leaderActor, raftState -> {
350 final var leaderLog = leaderContext.getReplicatedLog();
351 assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
352 assertEquals("Prior leader journal last index", 5, leaderLog.lastIndex());
353 assertEquals("Prior leader commit index", 5, leaderLog.getCommitIndex());
356 assertEquals("Prior leader state",
357 List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3, newLeaderPayload4),
358 leaderActor.underlyingActor().getState());
360 // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
362 final var applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
363 for (var as : applyState) {
364 if (as.entry().term() == 1) {
365 fail("Got unexpected ApplyState: " + as);
369 // The prior leader should not have needed a snapshot installed in order to get it synced.
371 assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
373 testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
376 private void removeIsolation() {
377 testLog.info("Removing isolation");
379 clearMessages(leaderNotifierActor);
380 clearMessages(leaderCollectorActor);
382 leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
383 leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
384 follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
385 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
388 private void forceElectionOnFollower1() {
389 // Force follower1 to start an election. follower2 should grant the vote.
391 testLog.info("Forcing election on {}", follower1Id);
393 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
395 expectFirstMatching(follower1NotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Leader));
397 currentTerm = follower1Context.currentTerm();
400 private void isolateLeader() {
401 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
403 testLog.info("Isolating the leader");
405 leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
406 leaderActor.underlyingActor().startDropMessages(RequestVote.class);
408 follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
409 ae -> ae.getLeaderId().equals(leaderId));
410 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
411 ae -> ae.getLeaderId().equals(leaderId));
413 clearMessages(follower1CollectorActor);
414 clearMessages(follower1NotifierActor);
415 clearMessages(leaderNotifierActor);
418 private void createRaftActors() {
419 testLog.info("createRaftActors starting");
421 follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
422 factory.generateActorId(follower1Id + "-notifier"));
424 DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
425 followerConfigParams.setHeartBeatInterval(Duration.ofMillis(100));
426 followerConfigParams.setElectionTimeoutFactor(1000);
427 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder()
428 .peerAddresses(ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
429 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
431 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
432 follower1Id, testActorPath(follower1Id)), followerConfigParams);
434 peerAddresses = ImmutableMap.<String, String>builder()
435 .put(follower1Id, follower1Actor.path().toString())
436 .put(follower2Id, follower2Actor.path().toString()).build();
438 leaderConfigParams = newLeaderConfigParams();
439 leaderConfigParams.setIsolatedLeaderCheckInterval(Duration.ofMillis(500));
441 leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
442 factory.generateActorId(leaderId + "-notifier"));
444 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder()
445 .peerAddresses(peerAddresses).config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
447 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
448 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
449 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
451 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
452 waitUntilLeader(leaderActor);
454 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
456 clearMessages(leaderCollectorActor);
457 clearMessages(follower1CollectorActor);
458 clearMessages(follower2CollectorActor);
460 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
461 currentTerm = leaderContext.currentTerm();
463 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
464 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
466 testLog.info("createRaftActors ending");