Fix warnings in sal-akka-raft test classes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / IsolationScenarioTest.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
17
18 import akka.actor.Actor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.testkit.TestActorRef;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Lists;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.RoleChanged;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
35 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
36 import scala.concurrent.duration.FiniteDuration;
37
38 /**
39  * Tests isolation of nodes end-to-end.
40  *
41  * @author Thomas Pantelis
42  */
43 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
44     private TestActorRef<Actor> follower1NotifierActor;
45     private TestActorRef<Actor> leaderNotifierActor;
46
47     /**
48      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
49      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
50      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
51      * appropriately.
52      */
53     @Test
54     public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
55         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
56
57         createRaftActors();
58
59         // Send an initial payloads and verify replication.
60
61         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
62         final  MockPayload payload1 = sendPayloadData(leaderActor, "one");
63         verifyApplyJournalEntries(leaderCollectorActor, 1);
64         verifyApplyJournalEntries(follower1CollectorActor, 1);
65         verifyApplyJournalEntries(follower2CollectorActor, 1);
66
67         isolateLeader();
68
69         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
70
71         testLog.info("Sending payload to isolated leader");
72
73         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
74
75         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
76         // is collected but not forwarded to the follower RaftActor.
77
78         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
79         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
80         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
81         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
82         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
83
84         // The leader should transition to IsolatedLeader.
85
86         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
87             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
88
89         forceElectionOnFollower1();
90
91         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
92         // and committed.
93
94         testLog.info("Sending payload to new leader");
95
96         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
97         verifyApplyJournalEntries(follower1CollectorActor, 2);
98         verifyApplyJournalEntries(follower2CollectorActor, 2);
99
100         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
101         assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
102         assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
103         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
104
105         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
106                 follower1Actor.underlyingActor().getState());
107
108         removeIsolation();
109
110         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
111         // with a higher term.
112
113         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
114             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
115
116         // The previous leader has a conflicting log entry at index 2 with a different term which should get
117         // replaced by the new leader's index 1 entry.
118
119         verifyApplyJournalEntries(leaderCollectorActor, 2);
120
121         assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
122         assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
123         assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
124         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
125
126         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
127                 leaderActor.underlyingActor().getState());
128
129         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
130     }
131
132     /**
133      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
134      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
135      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
136      * sides should reconcile their logs appropriately.
137      */
138     @Test
139     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
140         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
141
142         createRaftActors();
143
144         // Submit an initial payload that is committed/applied on all nodes.
145
146         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
147         verifyApplyJournalEntries(leaderCollectorActor, 0);
148         verifyApplyJournalEntries(follower1CollectorActor, 0);
149         verifyApplyJournalEntries(follower2CollectorActor, 0);
150
151         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
152         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
153         // with the updated leader commit index.
154
155         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
156         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
157
158         MockPayload payload1 = sendPayloadData(leaderActor, "one");
159
160         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
161         // message is forwarded to the followers.
162
163         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
164             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
165                     && ae.getEntries().get(0).getData().equals(payload1);
166         });
167
168         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
169             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
170                     && ae.getEntries().get(0).getData().equals(payload1);
171         });
172
173         verifyApplyJournalEntries(leaderCollectorActor, 1);
174
175         isolateLeader();
176
177         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
178
179         testLog.info("Sending payload to isolated leader");
180
181         final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
182
183         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
184         // is collected but not forwarded to the follower RaftActor.
185
186         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
187         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
188         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
189         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
190         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
191
192         // The leader should transition to IsolatedLeader.
193
194         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
195             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
196
197         forceElectionOnFollower1();
198
199         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
200         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
201         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
202
203         testLog.info("Sending payload to new leader");
204
205         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
206         verifyApplyJournalEntries(follower1CollectorActor, 3);
207         verifyApplyJournalEntries(follower2CollectorActor, 3);
208
209         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
210         assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
211         assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
212         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
213
214         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
215                 follower1Actor.underlyingActor().getState());
216
217         removeIsolation();
218
219         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
220         // with a higher term.
221
222         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
223             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
224
225         // The previous leader has a conflicting log entry at index 2 with a different term which should get
226         // replaced by the new leader's entry.
227
228         verifyApplyJournalEntries(leaderCollectorActor, 3);
229
230         verifyRaftState(leaderActor, raftState -> {
231             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
232             assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
233             assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
234         });
235
236         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
237                 leaderActor.underlyingActor().getState());
238
239         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
240
241         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
242         for (ApplyState as: applyState) {
243             if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
244                 fail("Got unexpected ApplyState: " + as);
245             }
246         }
247
248         // The prior leader should not have needed a snapshot installed in order to get it synced.
249
250         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
251
252         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
253     }
254
255     /**
256      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
257      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
258      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
259      * and both sides should reconcile their logs appropriately.
260      */
261     @Test
262     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
263         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
264
265         createRaftActors();
266
267         // Submit an initial payload that is committed/applied on all nodes.
268
269         final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
270         verifyApplyJournalEntries(leaderCollectorActor, 0);
271         verifyApplyJournalEntries(follower1CollectorActor, 0);
272         verifyApplyJournalEntries(follower2CollectorActor, 0);
273
274         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
275         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
276         // with the updated leader commit index.
277
278         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
279         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
280
281         MockPayload payload1 = sendPayloadData(leaderActor, "one");
282
283         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
284         // message is forwarded to the followers.
285
286         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
287             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
288                     && ae.getEntries().get(0).getData().equals(payload1);
289         });
290
291         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
292             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
293                     && ae.getEntries().get(0).getData().equals(payload1);
294         });
295
296         verifyApplyJournalEntries(leaderCollectorActor, 1);
297
298         isolateLeader();
299
300         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
301
302         testLog.info("Sending 3 payloads to isolated leader");
303
304         sendPayloadData(leaderActor, "two");
305         sendPayloadData(leaderActor, "three");
306         sendPayloadData(leaderActor, "four");
307
308         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
309         // are collected but not forwarded to the follower RaftActor.
310
311         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
312             for (ReplicatedLogEntry e: ae.getEntries()) {
313                 if (e.getIndex() == 4) {
314                     return true;
315                 }
316             }
317             return false;
318         });
319
320         // The leader should transition to IsolatedLeader.
321
322         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
323             rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
324
325         forceElectionOnFollower1();
326
327         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
328         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
329         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
330
331         testLog.info("Sending 3 payloads to new leader");
332
333         final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
334         final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
335         final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
336         verifyApplyJournalEntries(follower1CollectorActor, 5);
337         verifyApplyJournalEntries(follower2CollectorActor, 5);
338
339         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
340         assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
341         assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
342         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
343
344         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
345                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
346
347         removeIsolation();
348
349         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
350         // with a higher term.
351
352         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
353             rc -> rc.getNewRole().equals(RaftState.Follower.name()));
354
355         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
356         // replaced by the new leader's entries.
357
358         verifyApplyJournalEntries(leaderCollectorActor, 5);
359
360         verifyRaftState(leaderActor, raftState -> {
361             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
362             assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
363             assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
364         });
365
366         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
367                 newLeaderPayload4), leaderActor.underlyingActor().getState());
368
369         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
370
371         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
372         for (ApplyState as: applyState) {
373             if (as.getReplicatedLogEntry().getTerm() == 1) {
374                 fail("Got unexpected ApplyState: " + as);
375             }
376         }
377
378         // The prior leader should not have needed a snapshot installed in order to get it synced.
379
380         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
381
382         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
383     }
384
385     private void removeIsolation() {
386         testLog.info("Removing isolation");
387
388         clearMessages(leaderNotifierActor);
389         clearMessages(leaderCollectorActor);
390
391         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
392         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
393         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
394         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
395     }
396
397     private void forceElectionOnFollower1() {
398         // Force follower1 to start an election. follower2 should grant the vote.
399
400         testLog.info("Forcing election on {}", follower1Id);
401
402         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
403
404         expectFirstMatching(follower1NotifierActor, RoleChanged.class,
405             rc -> rc.getNewRole().equals(RaftState.Leader.name()));
406
407         currentTerm = follower1Context.getTermInformation().getCurrentTerm();
408     }
409
410     private void isolateLeader() {
411         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
412
413         testLog.info("Isolating the leader");
414
415         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
416         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
417
418         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
419             ae -> ae.getLeaderId().equals(leaderId));
420         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
421             ae -> ae.getLeaderId().equals(leaderId));
422
423         clearMessages(follower1CollectorActor);
424         clearMessages(follower1NotifierActor);
425         clearMessages(leaderNotifierActor);
426     }
427
428     private void createRaftActors() {
429         testLog.info("createRaftActors starting");
430
431         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
432                 factory.generateActorId(follower1Id + "-notifier"));
433
434         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
435         followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
436         followerConfigParams.setElectionTimeoutFactor(1000);
437         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
438                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
439                 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
440
441         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
442                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
443
444         peerAddresses = ImmutableMap.<String, String>builder()
445                 .put(follower1Id, follower1Actor.path().toString())
446                 .put(follower2Id, follower2Actor.path().toString()).build();
447
448         leaderConfigParams = newLeaderConfigParams();
449         leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
450
451         leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
452                 factory.generateActorId(leaderId + "-notifier"));
453
454         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
455                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
456
457         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
458         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
459         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
460
461         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
462         waitUntilLeader(leaderActor);
463
464         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
465
466
467         clearMessages(leaderCollectorActor);
468         clearMessages(follower1CollectorActor);
469         clearMessages(follower2CollectorActor);
470
471         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
472         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
473
474         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
475         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
476
477         testLog.info("createRaftActors ending");
478     }
479 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.