4847766f07bb26c023df6be4801a06a0f2bfcaa3
[controller.git] /
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.MessageCollectorActor.getAllMatching;
17
18 import com.google.common.collect.ImmutableMap;
19 import java.time.Duration;
20 import java.util.List;
21 import org.apache.pekko.actor.ActorRef;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.notifications.RoleChanged;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.raft.api.RaftRole;
31
32 /**
33  * Tests isolation of nodes end-to-end.
34  *
35  * @author Thomas Pantelis
36  */
37 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
38     private ActorRef follower1NotifierActor;
39     private ActorRef leaderNotifierActor;
40
41     /**
42      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
43      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
44      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
45      * appropriately.
46      */
47     @Test
48     public void testLeaderIsolationWithAllPriorEntriesCommitted() {
49         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
50
51         createRaftActors();
52
53         // Send an initial payloads and verify replication.
54
55         final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
56         final MockCommand payload1 = sendPayloadData(leaderActor, "one");
57         verifyApplyJournalEntries(leaderCollectorActor, 1);
58         verifyApplyJournalEntries(follower1CollectorActor, 1);
59         verifyApplyJournalEntries(follower2CollectorActor, 1);
60
61         isolateLeader();
62
63         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
64
65         testLog.info("Sending payload to isolated leader");
66
67         final MockCommand isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
68
69         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
70         // is collected but not forwarded to the follower RaftActor.
71
72         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
73         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
74         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
75         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
76         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
77
78         // The leader should transition to IsolatedLeader.
79
80         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
81
82         forceElectionOnFollower1();
83
84         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
85         // and committed.
86
87         testLog.info("Sending payload to new leader");
88
89         final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
90         verifyApplyJournalEntries(follower1CollectorActor, 2);
91         verifyApplyJournalEntries(follower2CollectorActor, 2);
92
93         final var follower1log = follower1Context.getReplicatedLog();
94         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
95         assertEquals("Follower 1 journal last index", 2, follower1log.lastIndex());
96         assertEquals("Follower 1 commit index", 2, follower1log.getCommitIndex());
97         verifyReplicatedLogEntry(follower1log.get(2), currentTerm, 2, newLeaderPayload2);
98
99         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
100                 follower1Actor.underlyingActor().getState());
101
102         removeIsolation();
103
104         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
105         // with a higher term.
106
107         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
108
109         // The previous leader has a conflicting log entry at index 2 with a different term which should get
110         // replaced by the new leader's index 1 entry.
111
112         verifyApplyJournalEntries(leaderCollectorActor, 2);
113
114         final var leaderLog = leaderContext.getReplicatedLog();
115         assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
116         assertEquals("Prior leader journal last index", 2, leaderLog.lastIndex());
117         assertEquals("Prior leader commit index", 2, leaderLog.getCommitIndex());
118         verifyReplicatedLogEntry(leaderLog.get(2), currentTerm, 2, newLeaderPayload2);
119
120         assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
121                 leaderActor.underlyingActor().getState());
122
123         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
124     }
125
126     /**
127      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
128      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
129      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
130      * sides should reconcile their logs appropriately.
131      */
132     @Test
133     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() {
134         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
135
136         createRaftActors();
137
138         // Submit an initial payload that is committed/applied on all nodes.
139
140         final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
141         verifyApplyJournalEntries(leaderCollectorActor, 0);
142         verifyApplyJournalEntries(follower1CollectorActor, 0);
143         verifyApplyJournalEntries(follower2CollectorActor, 0);
144
145         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
146         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
147         // with the updated leader commit index.
148
149         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
150         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
151
152         MockCommand payload1 = sendPayloadData(leaderActor, "one");
153
154         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
155         // message is forwarded to the followers.
156
157         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
158                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
159                         && ae.getEntries().getFirst().command().equals(payload1));
160
161         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
162                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
163                         && ae.getEntries().getFirst().command().equals(payload1));
164
165         verifyApplyJournalEntries(leaderCollectorActor, 1);
166
167         isolateLeader();
168
169         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
170
171         testLog.info("Sending payload to isolated leader");
172
173         final MockCommand isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
174
175         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
176         // is collected but not forwarded to the follower RaftActor.
177
178         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
179         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
180         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
181         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
182         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
183
184         // The leader should transition to IsolatedLeader.
185
186         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
187
188         forceElectionOnFollower1();
189
190         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
191         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
192         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
193
194         testLog.info("Sending payload to new leader");
195
196         final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
197         verifyApplyJournalEntries(follower1CollectorActor, 3);
198         verifyApplyJournalEntries(follower2CollectorActor, 3);
199
200         final var follower1log = follower1Context.getReplicatedLog();
201         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
202         assertEquals("Follower 1 journal last index", 3, follower1log.lastIndex());
203         assertEquals("Follower 1 commit index", 3, follower1log.getCommitIndex());
204         verifyReplicatedLogEntry(follower1log.get(3), currentTerm, 3, newLeaderPayload2);
205
206         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2),
207                 follower1Actor.underlyingActor().getState());
208
209         removeIsolation();
210
211         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
212         // with a higher term.
213
214         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
215
216         // The previous leader has a conflicting log entry at index 2 with a different term which should get
217         // replaced by the new leader's entry.
218
219         verifyApplyJournalEntries(leaderCollectorActor, 3);
220
221         verifyRaftState(leaderActor, raftState -> {
222             final var leaderLog = leaderContext.getReplicatedLog();
223             assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
224             assertEquals("Prior leader journal last index", 3, leaderLog.lastIndex());
225             assertEquals("Prior leader commit index", 3, leaderLog.getCommitIndex());
226         });
227
228         assertEquals("Prior leader state", List.of(payload0, payload1, newLeaderPayload2),
229                 leaderActor.underlyingActor().getState());
230
231         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
232
233         final var applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
234         for (var as : applyState) {
235             if (as.entry().index() == 2 && as.entry().term() == 1) {
236                 fail("Got unexpected ApplyState: " + as);
237             }
238         }
239
240         // The prior leader should not have needed a snapshot installed in order to get it synced.
241
242         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
243
244         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
245     }
246
247     /**
248      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
249      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
250      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
251      * and both sides should reconcile their logs appropriately.
252      */
253     @Test
254     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() {
255         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
256
257         createRaftActors();
258
259         // Submit an initial payload that is committed/applied on all nodes.
260
261         final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
262         verifyApplyJournalEntries(leaderCollectorActor, 0);
263         verifyApplyJournalEntries(follower1CollectorActor, 0);
264         verifyApplyJournalEntries(follower2CollectorActor, 0);
265
266         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
267         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
268         // with the updated leader commit index.
269
270         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
271         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
272
273         MockCommand payload1 = sendPayloadData(leaderActor, "one");
274
275         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
276         // message is forwarded to the followers.
277
278         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
279                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
280                         && ae.getEntries().getFirst().command().equals(payload1));
281
282         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
283                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
284                         && ae.getEntries().getFirst().command().equals(payload1));
285
286         verifyApplyJournalEntries(leaderCollectorActor, 1);
287
288         isolateLeader();
289
290         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
291
292         testLog.info("Sending 3 payloads to isolated leader");
293
294         sendPayloadData(leaderActor, "two");
295         sendPayloadData(leaderActor, "three");
296         sendPayloadData(leaderActor, "four");
297
298         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
299         // are collected but not forwarded to the follower RaftActor.
300
301         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
302             for (var entry : ae.getEntries()) {
303                 if (entry.index() == 4) {
304                     return true;
305                 }
306             }
307             return false;
308         });
309
310         // The leader should transition to IsolatedLeader.
311
312         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.IsolatedLeader));
313
314         forceElectionOnFollower1();
315
316         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
317         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
318         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
319
320         testLog.info("Sending 3 payloads to new leader");
321
322         final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
323         final var newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
324         final var newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
325         verifyApplyJournalEntries(follower1CollectorActor, 5);
326         verifyApplyJournalEntries(follower2CollectorActor, 5);
327
328         final var follower1log = follower1Context.getReplicatedLog();
329         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
330         assertEquals("Follower 1 journal last index", 5, follower1log.lastIndex());
331         assertEquals("Follower 1 commit index", 5, follower1log.getCommitIndex());
332         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
333
334         assertEquals("Follower 1 state", List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
335                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
336
337         removeIsolation();
338
339         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
340         // with a higher term.
341
342         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Follower));
343
344         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
345         // replaced by the new leader's entries.
346
347         verifyApplyJournalEntries(leaderCollectorActor, 5);
348
349         verifyRaftState(leaderActor, raftState -> {
350             final var leaderLog = leaderContext.getReplicatedLog();
351             assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
352             assertEquals("Prior leader journal last index", 5, leaderLog.lastIndex());
353             assertEquals("Prior leader commit index", 5, leaderLog.getCommitIndex());
354         });
355
356         assertEquals("Prior leader state",
357             List.of(payload0, payload1, newLeaderPayload2, newLeaderPayload3, newLeaderPayload4),
358             leaderActor.underlyingActor().getState());
359
360         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
361
362         final var applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
363         for (var as : applyState) {
364             if (as.entry().term() == 1) {
365                 fail("Got unexpected ApplyState: " + as);
366             }
367         }
368
369         // The prior leader should not have needed a snapshot installed in order to get it synced.
370
371         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
372
373         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
374     }
375
376     private void removeIsolation() {
377         testLog.info("Removing isolation");
378
379         clearMessages(leaderNotifierActor);
380         clearMessages(leaderCollectorActor);
381
382         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
383         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
384         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
385         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
386     }
387
388     private void forceElectionOnFollower1() {
389         // Force follower1 to start an election. follower2 should grant the vote.
390
391         testLog.info("Forcing election on {}", follower1Id);
392
393         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
394
395         expectFirstMatching(follower1NotifierActor, RoleChanged.class, rc -> rc.newRole().equals(RaftRole.Leader));
396
397         currentTerm = follower1Context.currentTerm();
398     }
399
400     private void isolateLeader() {
401         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
402
403         testLog.info("Isolating the leader");
404
405         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
406         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
407
408         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
409             ae -> ae.getLeaderId().equals(leaderId));
410         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
411             ae -> ae.getLeaderId().equals(leaderId));
412
413         clearMessages(follower1CollectorActor);
414         clearMessages(follower1NotifierActor);
415         clearMessages(leaderNotifierActor);
416     }
417
418     private void createRaftActors() {
419         testLog.info("createRaftActors starting");
420
421         follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
422                 factory.generateActorId(follower1Id + "-notifier"));
423
424         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
425         followerConfigParams.setHeartBeatInterval(Duration.ofMillis(100));
426         followerConfigParams.setElectionTimeoutFactor(1000);
427         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder()
428             .peerAddresses(ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
429                 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
430
431         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
432                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
433
434         peerAddresses = ImmutableMap.<String, String>builder()
435                 .put(follower1Id, follower1Actor.path().toString())
436                 .put(follower2Id, follower2Actor.path().toString()).build();
437
438         leaderConfigParams = newLeaderConfigParams();
439         leaderConfigParams.setIsolatedLeaderCheckInterval(Duration.ofMillis(500));
440
441         leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
442                 factory.generateActorId(leaderId + "-notifier"));
443
444         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder()
445             .peerAddresses(peerAddresses).config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
446
447         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
448         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
449         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
450
451         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
452         waitUntilLeader(leaderActor);
453
454         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
455
456         clearMessages(leaderCollectorActor);
457         clearMessages(follower1CollectorActor);
458         clearMessages(follower2CollectorActor);
459
460         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
461         currentTerm = leaderContext.currentTerm();
462
463         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
464         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
465
466         testLog.info("createRaftActors ending");
467     }
468 }