sal-akka-raft: use lambdas
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / IsolationScenarioTest.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
17
18 import akka.actor.Actor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.testkit.TestActorRef;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Lists;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.RoleChanged;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
35 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
36 import scala.concurrent.duration.FiniteDuration;
37
38 /**
39  * Tests isolation of nodes end-to-end.
40  *
41  * @author Thomas Pantelis
42  */
43 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
44     private TestActorRef<Actor> follower1NotifierActor;
45     private TestActorRef<Actor> leaderNotifierActor;
46
47     /**
48      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
49      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
50      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
51      * appropriately.
52      */
53     @Test
54     public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
55         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
56
57         createRaftActors();
58
59         // Send an initial payloads and verify replication.
60
61         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
62         final  MockPayload payload1 = sendPayloadData(leaderActor, "one");
63         verifyApplyJournalEntries(leaderCollectorActor, 1);
64         verifyApplyJournalEntries(follower1CollectorActor, 1);
65         verifyApplyJournalEntries(follower2CollectorActor, 1);
66
67         isolateLeader();
68
69         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
70
71         testLog.info("Sending payload to isolated leader");
72
73         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
74
75         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
76         // is collected but not forwarded to the follower RaftActor.
77
78         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
79         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
80         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
81         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
82         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
83
84         // The leader should transition to IsolatedLeader.
85
86         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
87             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
88
89         forceElectionOnFollower1();
90
91         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
92         // and committed.
93
94         testLog.info("Sending payload to new leader");
95
96         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
97         verifyApplyJournalEntries(follower1CollectorActor, 2);
98         verifyApplyJournalEntries(follower2CollectorActor, 2);
99
100         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
101         assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
102         assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
103         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
104
105         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
106                 follower1Actor.underlyingActor().getState());
107
108         removeIsolation();
109
110         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
111         // with a higher term.
112
113         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
114             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
115
116         // The previous leader has a conflicting log entry at index 2 with a different term which should get
117         // replaced by the new leader's index 1 entry.
118
119         verifyApplyJournalEntries(leaderCollectorActor, 2);
120
121         assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
122         assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
123         assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
124         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
125
126         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
127                 leaderActor.underlyingActor().getState());
128
129         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
130     }
131
132     /**
133      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
134      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
135      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
136      * sides should reconcile their logs appropriately.
137      */
138     @Test
139     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
140         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
141
142         createRaftActors();
143
144         // Submit an initial payload that is committed/applied on all nodes.
145
146         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
147         verifyApplyJournalEntries(leaderCollectorActor, 0);
148         verifyApplyJournalEntries(follower1CollectorActor, 0);
149         verifyApplyJournalEntries(follower2CollectorActor, 0);
150
151         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
152         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
153         // with the updated leader commit index.
154
155         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
156         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
157
158         MockPayload payload1 = sendPayloadData(leaderActor, "one");
159
160         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
161         // message is forwarded to the followers.
162
163         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
164                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
165                         && ae.getEntries().get(0).getData().equals(payload1));
166
167         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
168                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
169                         && ae.getEntries().get(0).getData().equals(payload1));
170
171         verifyApplyJournalEntries(leaderCollectorActor, 1);
172
173         isolateLeader();
174
175         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
176
177         testLog.info("Sending payload to isolated leader");
178
179         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
180
181         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
182         // is collected but not forwarded to the follower RaftActor.
183
184         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
185         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
186         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
187         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
188         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
189
190         // The leader should transition to IsolatedLeader.
191
192         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
193             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
194
195         forceElectionOnFollower1();
196
197         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
198         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
199         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
200
201         testLog.info("Sending payload to new leader");
202
203         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
204         verifyApplyJournalEntries(follower1CollectorActor, 3);
205         verifyApplyJournalEntries(follower2CollectorActor, 3);
206
207         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
208         assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
209         assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
210         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
211
212         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
213                 follower1Actor.underlyingActor().getState());
214
215         removeIsolation();
216
217         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
218         // with a higher term.
219
220         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
221             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
222
223         // The previous leader has a conflicting log entry at index 2 with a different term which should get
224         // replaced by the new leader's entry.
225
226         verifyApplyJournalEntries(leaderCollectorActor, 3);
227
228         verifyRaftState(leaderActor, raftState -> {
229             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
230             assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
231             assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
232         });
233
234         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
235                 leaderActor.underlyingActor().getState());
236
237         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
238
239         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
240         for (ApplyState as: applyState) {
241             if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
242                 fail("Got unexpected ApplyState: " + as);
243             }
244         }
245
246         // The prior leader should not have needed a snapshot installed in order to get it synced.
247
248         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
249
250         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
251     }
252
253     /**
254      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
255      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
256      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
257      * and both sides should reconcile their logs appropriately.
258      */
259     @Test
260     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
261         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
262
263         createRaftActors();
264
265         // Submit an initial payload that is committed/applied on all nodes.
266
267         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
268         verifyApplyJournalEntries(leaderCollectorActor, 0);
269         verifyApplyJournalEntries(follower1CollectorActor, 0);
270         verifyApplyJournalEntries(follower2CollectorActor, 0);
271
272         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
273         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
274         // with the updated leader commit index.
275
276         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
277         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
278
279         MockPayload payload1 = sendPayloadData(leaderActor, "one");
280
281         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
282         // message is forwarded to the followers.
283
284         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae ->
285                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
286                         && ae.getEntries().get(0).getData().equals(payload1));
287
288         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae ->
289                 ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
290                         && ae.getEntries().get(0).getData().equals(payload1));
291
292         verifyApplyJournalEntries(leaderCollectorActor, 1);
293
294         isolateLeader();
295
296         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
297
298         testLog.info("Sending 3 payloads to isolated leader");
299
300         sendPayloadData(leaderActor, "two");
301         sendPayloadData(leaderActor, "three");
302         sendPayloadData(leaderActor, "four");
303
304         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
305         // are collected but not forwarded to the follower RaftActor.
306
307         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
308             for (ReplicatedLogEntry e: ae.getEntries()) {
309                 if (e.getIndex() == 4) {
310                     return true;
311                 }
312             }
313             return false;
314         });
315
316         // The leader should transition to IsolatedLeader.
317
318         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
319             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
320
321         forceElectionOnFollower1();
322
323         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
324         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
325         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
326
327         testLog.info("Sending 3 payloads to new leader");
328
329         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
330         final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
331         final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
332         verifyApplyJournalEntries(follower1CollectorActor, 5);
333         verifyApplyJournalEntries(follower2CollectorActor, 5);
334
335         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
336         assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
337         assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
338         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
339
340         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
341                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
342
343         removeIsolation();
344
345         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
346         // with a higher term.
347
348         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
349             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
350
351         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
352         // replaced by the new leader's entries.
353
354         verifyApplyJournalEntries(leaderCollectorActor, 5);
355
356         verifyRaftState(leaderActor, raftState -> {
357             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
358             assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
359             assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
360         });
361
362         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
363                 newLeaderPayload4), leaderActor.underlyingActor().getState());
364
365         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
366
367         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
368         for (ApplyState as: applyState) {
369             if (as.getReplicatedLogEntry().getTerm() == 1) {
370                 fail("Got unexpected ApplyState: " + as);
371             }
372         }
373
374         // The prior leader should not have needed a snapshot installed in order to get it synced.
375
376         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
377
378         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
379     }
380
381     private void removeIsolation() {
382         testLog.info("Removing isolation");
383
384         clearMessages(leaderNotifierActor);
385         clearMessages(leaderCollectorActor);
386
387         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
388         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
389         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
390         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
391     }
392
393     private void forceElectionOnFollower1() {
394         // Force follower1 to start an election. follower2 should grant the vote.
395
396         testLog.info("Forcing election on {}", follower1Id);
397
398         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
399
400         expectFirstMatching(follower1NotifierActor, RoleChanged.class,
401             rc -> rc.getNewRole().equals(RaftState.Leader.name()));
402
403         currentTerm = follower1Context.getTermInformation().getCurrentTerm();
404     }
405
406     private void isolateLeader() {
407         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
408
409         testLog.info("Isolating the leader");
410
411         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
412         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
413
414         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
415             ae -> ae.getLeaderId().equals(leaderId));
416         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
417             ae -> ae.getLeaderId().equals(leaderId));
418
419         clearMessages(follower1CollectorActor);
420         clearMessages(follower1NotifierActor);
421         clearMessages(leaderNotifierActor);
422     }
423
424     private void createRaftActors() {
425         testLog.info("createRaftActors starting");
426
427         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
428                 factory.generateActorId(follower1Id + "-notifier"));
429
430         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
431         followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
432         followerConfigParams.setElectionTimeoutFactor(1000);
433         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
434                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
435                 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
436
437         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
438                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
439
440         peerAddresses = ImmutableMap.<String, String>builder()
441                 .put(follower1Id, follower1Actor.path().toString())
442                 .put(follower2Id, follower2Actor.path().toString()).build();
443
444         leaderConfigParams = newLeaderConfigParams();
445         leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
446
447         leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
448                 factory.generateActorId(leaderId + "-notifier"));
449
450         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
451                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
452
453         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
454         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
455         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
456
457         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
458         waitUntilLeader(leaderActor);
459
460         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
461
462
463         clearMessages(leaderCollectorActor);
464         clearMessages(follower1CollectorActor);
465         clearMessages(follower2CollectorActor);
466
467         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
468         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
469
470         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
471         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
472
473         testLog.info("createRaftActors ending");
474     }
475 }