39f90d846070d0eb4e30255dde82691112c2140b
[controller.git] /
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
17
18 import com.google.common.collect.ImmutableMap;
19 import java.time.Duration;
20 import java.util.List;
21 import org.apache.pekko.actor.ActorRef;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.notifications.RoleChanged;
24 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
26 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
31 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
32
33 /**
34  * Tests isolation of nodes end-to-end.
35  *
36  * @author Thomas Pantelis
37  */
38 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
39     private ActorRef follower1NotifierActor;
40     private ActorRef leaderNotifierActor;
41
42     /**
43      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
44      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
45      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
46      * appropriately.
47      */
48     @Test
49     public void testLeaderIsolationWithAllPriorEntriesCommitted() {
50         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
51
52         createRaftActors();
53
54         // Send an initial payloads and verify replication.
55
56         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
57         final MockPayload payload1 = sendPayloadData(leaderActor, "one");
58         verifyApplyJournalEntries(leaderCollectorActor, 1);
59         verifyApplyJournalEntries(follower1CollectorActor, 1);
60         verifyApplyJournalEntries(follower2CollectorActor, 1);
61
62         isolateLeader();
63
64         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
65
66         testLog.info("Sending payload to isolated leader");
67
68         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
69
70         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
71         // is collected but not forwarded to the follower RaftActor.
72
73         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
74         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
75         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
76         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
77         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
78
79         // The leader should transition to IsolatedLeader.
80
81         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
82             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
83
84         forceElectionOnFollower1();
85
86         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
87         // and committed.
88
89         testLog.info("Sending payload to new leader");
90
91         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
92         verifyApplyJournalEntries(follower1CollectorActor, 2);
93         verifyApplyJournalEntries(follower2CollectorActor, 2);
94
95         final var follower1log = follower1Context.getReplicatedLog();
96         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
97         assertEquals("Follower 1 journal last index", 2, follower1log.lastIndex());
98         assertEquals("Follower 1 commit index", 2, follower1log.getCommitIndex());
99         verifyReplicatedLogEntry(follower1log.get(2), currentTerm, 2, newLeaderPayload2);
100
101         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
102                 follower1Actor.underlyingActor().getState());
103
104         removeIsolation();
105
106         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
107         // with a higher term.
108
109         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
110             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
111
112         // The previous leader has a conflicting log entry at index 2 with a different term which should get
113         // replaced by the new leader's index 1 entry.
114
115         verifyApplyJournalEntries(leaderCollectorActor, 2);
116
117         final var leaderLog = leaderContext.getReplicatedLog();
118         assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
119         assertEquals("Prior leader journal last index", 2, leaderLog.lastIndex());
120         assertEquals("Prior leader commit index", 2, leaderLog.getCommitIndex());
121         verifyReplicatedLogEntry(leaderLog.get(2), currentTerm, 2, newLeaderPayload2);
122
123         assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
124                 leaderActor.underlyingActor().getState());
125
126         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
127     }
128
129     /**
130      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
131      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
132      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
133      * sides should reconcile their logs appropriately.
134      */
135     @Test
136     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() {
137         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
138
139         createRaftActors();
140
141         // Submit an initial payload that is committed/applied on all nodes.
142
143         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
144         verifyApplyJournalEntries(leaderCollectorActor, 0);
145         verifyApplyJournalEntries(follower1CollectorActor, 0);
146         verifyApplyJournalEntries(follower2CollectorActor, 0);
147
148         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
149         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
150         // with the updated leader commit index.
151
152         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
153         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
154
155         MockPayload payload1 = sendPayloadData(leaderActor, "one");
156
157         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
158         // message is forwarded to the followers.
159
160         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
161                 ae.getEntries().size() == 1 && ae.getEntries().get(0).index() == 1
162                         && ae.getEntries().get(0).getData().equals(payload1));
163
164         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
165                 ae.getEntries().size() == 1 && ae.getEntries().get(0).index() == 1
166                         && ae.getEntries().get(0).getData().equals(payload1));
167
168         verifyApplyJournalEntries(leaderCollectorActor, 1);
169
170         isolateLeader();
171
172         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
173
174         testLog.info("Sending payload to isolated leader");
175
176         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
177
178         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
179         // is collected but not forwarded to the follower RaftActor.
180
181         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
182         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
183         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
184         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
185         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
186
187         // The leader should transition to IsolatedLeader.
188
189         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
190             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
191
192         forceElectionOnFollower1();
193
194         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
195         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
196         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
197
198         testLog.info("Sending payload to new leader");
199
200         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
201         verifyApplyJournalEntries(follower1CollectorActor, 3);
202         verifyApplyJournalEntries(follower2CollectorActor, 3);
203
204         final var follower1log = follower1Context.getReplicatedLog();
205         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
206         assertEquals("Follower 1 journal last index", 3, follower1log.lastIndex());
207         assertEquals("Follower 1 commit index", 3, follower1log.getCommitIndex());
208         verifyReplicatedLogEntry(follower1log.get(3), currentTerm, 3, newLeaderPayload2);
209
210         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
211                 follower1Actor.underlyingActor().getState());
212
213         removeIsolation();
214
215         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
216         // with a higher term.
217
218         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
219             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
220
221         // The previous leader has a conflicting log entry at index 2 with a different term which should get
222         // replaced by the new leader's entry.
223
224         verifyApplyJournalEntries(leaderCollectorActor, 3);
225
226         verifyRaftState(leaderActor, raftState -> {
227             final var leaderLog = leaderContext.getReplicatedLog();
228             assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
229             assertEquals("Prior leader journal last index", 3, leaderLog.lastIndex());
230             assertEquals("Prior leader commit index", 3, leaderLog.getCommitIndex());
231         });
232
233         assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
234                 leaderActor.underlyingActor().getState());
235
236         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
237
238         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
239         for (ApplyState as: applyState) {
240             if (as.getReplicatedLogEntry().index() == 2 && as.getReplicatedLogEntry().term() == 1) {
241                 fail("Got unexpected ApplyState: " + as);
242             }
243         }
244
245         // The prior leader should not have needed a snapshot installed in order to get it synced.
246
247         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
248
249         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
250     }
251
252     /**
253      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
254      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
255      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
256      * and both sides should reconcile their logs appropriately.
257      */
258     @Test
259     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() {
260         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
261
262         createRaftActors();
263
264         // Submit an initial payload that is committed/applied on all nodes.
265
266         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
267         verifyApplyJournalEntries(leaderCollectorActor, 0);
268         verifyApplyJournalEntries(follower1CollectorActor, 0);
269         verifyApplyJournalEntries(follower2CollectorActor, 0);
270
271         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
272         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
273         // with the updated leader commit index.
274
275         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
276         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
277
278         MockPayload payload1 = sendPayloadData(leaderActor, "one");
279
280         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
281         // message is forwarded to the followers.
282
283         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
284                 ae.getEntries().size() == 1 && ae.getEntries().get(0).index() == 1
285                         && ae.getEntries().get(0).getData().equals(payload1));
286
287         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
288                 ae.getEntries().size() == 1 && ae.getEntries().get(0).index() == 1
289                         && ae.getEntries().get(0).getData().equals(payload1));
290
291         verifyApplyJournalEntries(leaderCollectorActor, 1);
292
293         isolateLeader();
294
295         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
296
297         testLog.info("Sending 3 payloads to isolated leader");
298
299         sendPayloadData(leaderActor, "two");
300         sendPayloadData(leaderActor, "three");
301         sendPayloadData(leaderActor, "four");
302
303         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
304         // are collected but not forwarded to the follower RaftActor.
305
306         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
307             for (var entry : ae.getEntries()) {
308                 if (entry.index() == 4) {
309                     return true;
310                 }
311             }
312             return false;
313         });
314
315         // The leader should transition to IsolatedLeader.
316
317         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
318             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
319
320         forceElectionOnFollower1();
321
322         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
323         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
324         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
325
326         testLog.info("Sending 3 payloads to new leader");
327
328         final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
329         final var newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
330         final var newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
331         verifyApplyJournalEntries(follower1CollectorActor, 5);
332         verifyApplyJournalEntries(follower2CollectorActor, 5);
333
334         final var follower1log = follower1Context.getReplicatedLog();
335         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
336         assertEquals("Follower 1 journal last index", 5, follower1log.lastIndex());
337         assertEquals("Follower 1 commit index", 5, follower1log.getCommitIndex());
338         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
339
340         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
341                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
342
343         removeIsolation();
344
345         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
346         // with a higher term.
347
348         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
349             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
350
351         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
352         // replaced by the new leader's entries.
353
354         verifyApplyJournalEntries(leaderCollectorActor, 5);
355
356         verifyRaftState(leaderActor, raftState -> {
357             final var leaderLog = leaderContext.getReplicatedLog();
358             assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
359             assertEquals("Prior leader journal last index", 5, leaderLog.lastIndex());
360             assertEquals("Prior leader commit index", 5, leaderLog.getCommitIndex());
361         });
362
363         assertEquals("Prior leader state",
364             List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3, newLeaderPayload4),
365             leaderActor.underlyingActor().getState());
366
367         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
368
369         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
370         for (ApplyState as: applyState) {
371             if (as.getReplicatedLogEntry().term() == 1) {
372                 fail("Got unexpected ApplyState: " + as);
373             }
374         }
375
376         // The prior leader should not have needed a snapshot installed in order to get it synced.
377
378         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
379
380         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
381     }
382
383     private void removeIsolation() {
384         testLog.info("Removing isolation");
385
386         clearMessages(leaderNotifierActor);
387         clearMessages(leaderCollectorActor);
388
389         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
390         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
391         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
392         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
393     }
394
395     private void forceElectionOnFollower1() {
396         // Force follower1 to start an election. follower2 should grant the vote.
397
398         testLog.info("Forcing election on {}", follower1Id);
399
400         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
401
402         expectFirstMatching(follower1NotifierActor, RoleChanged.class,
403             rc -> rc.getNewRole().equals(RaftState.Leader.name()));
404
405         currentTerm = follower1Context.currentTerm();
406     }
407
408     private void isolateLeader() {
409         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
410
411         testLog.info("Isolating the leader");
412
413         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
414         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
415
416         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
417             ae -> ae.getLeaderId().equals(leaderId));
418         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
419             ae -> ae.getLeaderId().equals(leaderId));
420
421         clearMessages(follower1CollectorActor);
422         clearMessages(follower1NotifierActor);
423         clearMessages(leaderNotifierActor);
424     }
425
426     private void createRaftActors() {
427         testLog.info("createRaftActors starting");
428
429         follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
430                 factory.generateActorId(follower1Id + "-notifier"));
431
432         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
433         followerConfigParams.setHeartBeatInterval(Duration.ofMillis(100));
434         followerConfigParams.setElectionTimeoutFactor(1000);
435         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder()
436             .peerAddresses(ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
437                 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
438
439         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
440                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
441
442         peerAddresses = ImmutableMap.<String, String>builder()
443                 .put(follower1Id, follower1Actor.path().toString())
444                 .put(follower2Id, follower2Actor.path().toString()).build();
445
446         leaderConfigParams = newLeaderConfigParams();
447         leaderConfigParams.setIsolatedLeaderCheckInterval(Duration.ofMillis(500));
448
449         leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
450                 factory.generateActorId(leaderId + "-notifier"));
451
452         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder()
453             .peerAddresses(peerAddresses).config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
454
455         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
456         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
457         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
458
459         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
460         waitUntilLeader(leaderActor);
461
462         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
463
464         clearMessages(leaderCollectorActor);
465         clearMessages(follower1CollectorActor);
466         clearMessages(follower2CollectorActor);
467
468         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
469         currentTerm = leaderContext.currentTerm();
470
471         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
472         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
473
474         testLog.info("createRaftActors ending");
475     }
476 }