Migrate sal-akka-raft tests
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / IsolationScenarioTest.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
17
18 import akka.actor.ActorRef;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Lists;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.notifications.RoleChanged;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
29 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
30 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import scala.concurrent.duration.FiniteDuration;
34
35 /**
36  * Tests isolation of nodes end-to-end.
37  *
38  * @author Thomas Pantelis
39  */
40 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
41     private ActorRef follower1NotifierActor;
42     private ActorRef leaderNotifierActor;
43
44     /**
45      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
46      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
47      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
48      * appropriately.
49      */
50     @Test
51     public void testLeaderIsolationWithAllPriorEntriesCommitted() {
52         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
53
54         createRaftActors();
55
56         // Send an initial payloads and verify replication.
57
58         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
59         final  MockPayload payload1 = sendPayloadData(leaderActor, "one");
60         verifyApplyJournalEntries(leaderCollectorActor, 1);
61         verifyApplyJournalEntries(follower1CollectorActor, 1);
62         verifyApplyJournalEntries(follower2CollectorActor, 1);
63
64         isolateLeader();
65
66         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
67
68         testLog.info("Sending payload to isolated leader");
69
70         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
71
72         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
73         // is collected but not forwarded to the follower RaftActor.
74
75         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
76         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
77         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
78         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
79         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
80
81         // The leader should transition to IsolatedLeader.
82
83         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
84             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
85
86         forceElectionOnFollower1();
87
88         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
89         // and committed.
90
91         testLog.info("Sending payload to new leader");
92
93         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
94         verifyApplyJournalEntries(follower1CollectorActor, 2);
95         verifyApplyJournalEntries(follower2CollectorActor, 2);
96
97         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
98         assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
99         assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
100         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
101
102         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
103                 follower1Actor.underlyingActor().getState());
104
105         removeIsolation();
106
107         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
108         // with a higher term.
109
110         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
111             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
112
113         // The previous leader has a conflicting log entry at index 2 with a different term which should get
114         // replaced by the new leader's index 1 entry.
115
116         verifyApplyJournalEntries(leaderCollectorActor, 2);
117
118         assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
119         assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
120         assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
121         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
122
123         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
124                 leaderActor.underlyingActor().getState());
125
126         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
127     }
128
129     /**
130      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
131      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
132      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
133      * sides should reconcile their logs appropriately.
134      */
135     @Test
136     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() {
137         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
138
139         createRaftActors();
140
141         // Submit an initial payload that is committed/applied on all nodes.
142
143         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
144         verifyApplyJournalEntries(leaderCollectorActor, 0);
145         verifyApplyJournalEntries(follower1CollectorActor, 0);
146         verifyApplyJournalEntries(follower2CollectorActor, 0);
147
148         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
149         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
150         // with the updated leader commit index.
151
152         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
153         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
154
155         MockPayload payload1 = sendPayloadData(leaderActor, "one");
156
157         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
158         // message is forwarded to the followers.
159
160         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
161                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
162                         && ae.getEntries().get(0).getData().equals(payload1));
163
164         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
165                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
166                         && ae.getEntries().get(0).getData().equals(payload1));
167
168         verifyApplyJournalEntries(leaderCollectorActor, 1);
169
170         isolateLeader();
171
172         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
173
174         testLog.info("Sending payload to isolated leader");
175
176         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
177
178         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
179         // is collected but not forwarded to the follower RaftActor.
180
181         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
182         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
183         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
184         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
185         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
186
187         // The leader should transition to IsolatedLeader.
188
189         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
190             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
191
192         forceElectionOnFollower1();
193
194         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
195         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
196         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
197
198         testLog.info("Sending payload to new leader");
199
200         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
201         verifyApplyJournalEntries(follower1CollectorActor, 3);
202         verifyApplyJournalEntries(follower2CollectorActor, 3);
203
204         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
205         assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
206         assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
207         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
208
209         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
210                 follower1Actor.underlyingActor().getState());
211
212         removeIsolation();
213
214         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
215         // with a higher term.
216
217         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
218             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
219
220         // The previous leader has a conflicting log entry at index 2 with a different term which should get
221         // replaced by the new leader's entry.
222
223         verifyApplyJournalEntries(leaderCollectorActor, 3);
224
225         verifyRaftState(leaderActor, raftState -> {
226             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
227             assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
228             assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
229         });
230
231         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
232                 leaderActor.underlyingActor().getState());
233
234         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
235
236         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
237         for (ApplyState as: applyState) {
238             if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
239                 fail("Got unexpected ApplyState: " + as);
240             }
241         }
242
243         // The prior leader should not have needed a snapshot installed in order to get it synced.
244
245         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
246
247         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
248     }
249
250     /**
251      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
252      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
253      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
254      * and both sides should reconcile their logs appropriately.
255      */
256     @Test
257     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() {
258         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
259
260         createRaftActors();
261
262         // Submit an initial payload that is committed/applied on all nodes.
263
264         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
265         verifyApplyJournalEntries(leaderCollectorActor, 0);
266         verifyApplyJournalEntries(follower1CollectorActor, 0);
267         verifyApplyJournalEntries(follower2CollectorActor, 0);
268
269         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
270         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
271         // with the updated leader commit index.
272
273         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
274         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
275
276         MockPayload payload1 = sendPayloadData(leaderActor, "one");
277
278         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
279         // message is forwarded to the followers.
280
281         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
282                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
283                         && ae.getEntries().get(0).getData().equals(payload1));
284
285         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
286                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
287                         && ae.getEntries().get(0).getData().equals(payload1));
288
289         verifyApplyJournalEntries(leaderCollectorActor, 1);
290
291         isolateLeader();
292
293         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
294
295         testLog.info("Sending 3 payloads to isolated leader");
296
297         sendPayloadData(leaderActor, "two");
298         sendPayloadData(leaderActor, "three");
299         sendPayloadData(leaderActor, "four");
300
301         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
302         // are collected but not forwarded to the follower RaftActor.
303
304         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
305             for (ReplicatedLogEntry e: ae.getEntries()) {
306                 if (e.getIndex() == 4) {
307                     return true;
308                 }
309             }
310             return false;
311         });
312
313         // The leader should transition to IsolatedLeader.
314
315         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
316             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
317
318         forceElectionOnFollower1();
319
320         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
321         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
322         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
323
324         testLog.info("Sending 3 payloads to new leader");
325
326         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
327         final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
328         final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
329         verifyApplyJournalEntries(follower1CollectorActor, 5);
330         verifyApplyJournalEntries(follower2CollectorActor, 5);
331
332         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
333         assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
334         assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
335         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
336
337         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
338                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
339
340         removeIsolation();
341
342         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
343         // with a higher term.
344
345         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
346             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
347
348         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
349         // replaced by the new leader's entries.
350
351         verifyApplyJournalEntries(leaderCollectorActor, 5);
352
353         verifyRaftState(leaderActor, raftState -> {
354             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
355             assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
356             assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
357         });
358
359         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
360                 newLeaderPayload4), leaderActor.underlyingActor().getState());
361
362         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
363
364         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
365         for (ApplyState as: applyState) {
366             if (as.getReplicatedLogEntry().getTerm() == 1) {
367                 fail("Got unexpected ApplyState: " + as);
368             }
369         }
370
371         // The prior leader should not have needed a snapshot installed in order to get it synced.
372
373         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
374
375         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
376     }
377
378     private void removeIsolation() {
379         testLog.info("Removing isolation");
380
381         clearMessages(leaderNotifierActor);
382         clearMessages(leaderCollectorActor);
383
384         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
385         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
386         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
387         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
388     }
389
390     private void forceElectionOnFollower1() {
391         // Force follower1 to start an election. follower2 should grant the vote.
392
393         testLog.info("Forcing election on {}", follower1Id);
394
395         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
396
397         expectFirstMatching(follower1NotifierActor, RoleChanged.class,
398             rc -> rc.getNewRole().equals(RaftState.Leader.name()));
399
400         currentTerm = follower1Context.getTermInformation().getCurrentTerm();
401     }
402
403     private void isolateLeader() {
404         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
405
406         testLog.info("Isolating the leader");
407
408         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
409         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
410
411         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
412             ae -> ae.getLeaderId().equals(leaderId));
413         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
414             ae -> ae.getLeaderId().equals(leaderId));
415
416         clearMessages(follower1CollectorActor);
417         clearMessages(follower1NotifierActor);
418         clearMessages(leaderNotifierActor);
419     }
420
421     private void createRaftActors() {
422         testLog.info("createRaftActors starting");
423
424         follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
425                 factory.generateActorId(follower1Id + "-notifier"));
426
427         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
428         followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
429         followerConfigParams.setElectionTimeoutFactor(1000);
430         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
431                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
432                 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
433
434         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
435                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
436
437         peerAddresses = ImmutableMap.<String, String>builder()
438                 .put(follower1Id, follower1Actor.path().toString())
439                 .put(follower2Id, follower2Actor.path().toString()).build();
440
441         leaderConfigParams = newLeaderConfigParams();
442         leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
443
444         leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
445                 factory.generateActorId(leaderId + "-notifier"));
446
447         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
448                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
449
450         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
451         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
452         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
453
454         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
455         waitUntilLeader(leaderActor);
456
457         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
458
459
460         clearMessages(leaderCollectorActor);
461         clearMessages(follower1CollectorActor);
462         clearMessages(follower2CollectorActor);
463
464         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
465         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
466
467         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
468         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
469
470         testLog.info("createRaftActors ending");
471     }
472 }