Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / IsolationScenarioTest.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
17
18 import akka.actor.Actor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.testkit.TestActorRef;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Lists;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.RoleChanged;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
35 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
36 import scala.concurrent.duration.FiniteDuration;
37
38 /**
39  * Tests isolation of nodes end-to-end.
40  *
41  * @author Thomas Pantelis
42  */
43 public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
44     private TestActorRef<Actor> follower1NotifierActor;
45     private TestActorRef<Actor> leaderNotifierActor;
46
47     /**
48      * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
49      * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
50      * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
51      * appropriately.
52      */
53     @Test
54     public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
55         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
56
57         createRaftActors();
58
59         // Send an initial payloads and verify replication.
60
61         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
62         MockPayload payload1 = sendPayloadData(leaderActor, "one");
63         verifyApplyJournalEntries(leaderCollectorActor, 1);
64         verifyApplyJournalEntries(follower1CollectorActor, 1);
65         verifyApplyJournalEntries(follower2CollectorActor, 1);
66
67         isolateLeader();
68
69         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
70
71         testLog.info("Sending payload to isolated leader");
72
73         MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
74
75         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
76         // is collected but not forwarded to the follower RaftActor.
77
78         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
79         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
80         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
81         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
82         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
83
84         // The leader should transition to IsolatedLeader.
85
86         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
87                 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
88
89         forceElectionOnFollower1();
90
91         // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2 and committed.
92
93         testLog.info("Sending payload to new leader");
94
95         MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
96         verifyApplyJournalEntries(follower1CollectorActor, 2);
97         verifyApplyJournalEntries(follower2CollectorActor, 2);
98
99         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
100         assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
101         assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
102         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
103
104         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
105                 follower1Actor.underlyingActor().getState());
106
107         removeIsolation();
108
109         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
110         // with a higher term.
111
112         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
113
114         // The previous leader has a conflicting log entry at index 2 with a different term which should get
115         // replaced by the new leader's index 1 entry.
116
117         verifyApplyJournalEntries(leaderCollectorActor, 2);
118
119         assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
120         assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
121         assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
122         verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
123
124         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
125                 leaderActor.underlyingActor().getState());
126
127         testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
128     }
129
130     /**
131      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
132      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
133      * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
134      * sides should reconcile their logs appropriately.
135      */
136     @Test
137     public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
138         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
139
140         createRaftActors();
141
142         // Submit an initial payload that is committed/applied on all nodes.
143
144         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
145         verifyApplyJournalEntries(leaderCollectorActor, 0);
146         verifyApplyJournalEntries(follower1CollectorActor, 0);
147         verifyApplyJournalEntries(follower2CollectorActor, 0);
148
149         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
150         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
151         // with the updated leader commit index.
152
153         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
154         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
155
156         MockPayload payload1 = sendPayloadData(leaderActor, "one");
157
158         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
159         // message is forwarded to the followers.
160
161         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
162             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
163                     ae.getEntries().get(0).getData().equals(payload1);
164         });
165
166         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
167             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
168                     ae.getEntries().get(0).getData().equals(payload1);
169         });
170
171         verifyApplyJournalEntries(leaderCollectorActor, 1);
172
173         isolateLeader();
174
175         // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
176
177         testLog.info("Sending payload to isolated leader");
178
179         MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
180
181         // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
182         // is collected but not forwarded to the follower RaftActor.
183
184         AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
185         assertEquals("getTerm", currentTerm, appendEntries.getTerm());
186         assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
187         assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
188         verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
189
190         // The leader should transition to IsolatedLeader.
191
192         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
193                 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
194
195         forceElectionOnFollower1();
196
197         // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
198         // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
199         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
200
201         testLog.info("Sending payload to new leader");
202
203         MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
204         verifyApplyJournalEntries(follower1CollectorActor, 3);
205         verifyApplyJournalEntries(follower2CollectorActor, 3);
206
207         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
208         assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
209         assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
210         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
211
212         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
213                 follower1Actor.underlyingActor().getState());
214
215         removeIsolation();
216
217         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
218         // with a higher term.
219
220         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
221
222         // The previous leader has a conflicting log entry at index 2 with a different term which should get
223         // replaced by the new leader's entry.
224
225         verifyApplyJournalEntries(leaderCollectorActor, 3);
226
227         verifyRaftState(leaderActor, raftState -> {
228             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
229             assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
230             assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
231         });
232
233         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
234                 leaderActor.underlyingActor().getState());
235
236         // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
237
238         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
239         for(ApplyState as: applyState) {
240             if(as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
241                 fail("Got unexpected ApplyState: " + as);
242             }
243         }
244
245         // The prior leader should not have needed a snapshot installed in order to get it synced.
246
247         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
248
249         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
250     }
251
252     /**
253      * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
254      * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
255      * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
256      * and both sides should reconcile their logs appropriately.
257      */
258     @Test
259     public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
260         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
261
262         createRaftActors();
263
264         // Submit an initial payload that is committed/applied on all nodes.
265
266         MockPayload payload0 = sendPayloadData(leaderActor, "zero");
267         verifyApplyJournalEntries(leaderCollectorActor, 0);
268         verifyApplyJournalEntries(follower1CollectorActor, 0);
269         verifyApplyJournalEntries(follower2CollectorActor, 0);
270
271         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
272         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
273         // with the updated leader commit index.
274
275         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
276         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
277
278         MockPayload payload1 = sendPayloadData(leaderActor, "one");
279
280         // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
281         // message is forwarded to the followers.
282
283         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
284             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
285                     ae.getEntries().get(0).getData().equals(payload1);
286         });
287
288         expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
289             return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
290                     ae.getEntries().get(0).getData().equals(payload1);
291         });
292
293         verifyApplyJournalEntries(leaderCollectorActor, 1);
294
295         isolateLeader();
296
297         // Send 3 payloads to the isolated leader so it has uncommitted log entries.
298
299         testLog.info("Sending 3 payloads to isolated leader");
300
301         sendPayloadData(leaderActor, "two");
302         sendPayloadData(leaderActor, "three");
303         sendPayloadData(leaderActor, "four");
304
305         // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
306         // are collected but not forwarded to the follower RaftActor.
307
308         expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
309             for(ReplicatedLogEntry e: ae.getEntries()) {
310                 if(e.getIndex() == 4) {
311                     return true;
312                 }
313             }
314             return false;
315         });
316
317         // The leader should transition to IsolatedLeader.
318
319         expectFirstMatching(leaderNotifierActor, RoleChanged.class,
320                 rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
321
322         forceElectionOnFollower1();
323
324         // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
325         // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
326         // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
327
328         testLog.info("Sending 3 payloads to new leader");
329
330         MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
331         MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
332         MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
333         verifyApplyJournalEntries(follower1CollectorActor, 5);
334         verifyApplyJournalEntries(follower2CollectorActor, 5);
335
336         assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
337         assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
338         assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
339         verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
340
341         assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
342                 newLeaderPayload4), follower1Actor.underlyingActor().getState());
343
344         removeIsolation();
345
346         // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
347         // with a higher term.
348
349         expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
350
351         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
352         // replaced by the new leader's entries.
353
354         verifyApplyJournalEntries(leaderCollectorActor, 5);
355
356         verifyRaftState(leaderActor, raftState -> {
357             assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
358             assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
359             assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
360         });
361
362         assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
363                 newLeaderPayload4), leaderActor.underlyingActor().getState());
364
365         // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
366
367         List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
368         for(ApplyState as: applyState) {
369             if(as.getReplicatedLogEntry().getTerm() == 1) {
370                 fail("Got unexpected ApplyState: " + as);
371             }
372         }
373
374         // The prior leader should not have needed a snapshot installed in order to get it synced.
375
376         assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
377
378         testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
379     }
380
381     private void removeIsolation() {
382         testLog.info("Removing isolation");
383
384         clearMessages(leaderNotifierActor);
385         clearMessages(leaderCollectorActor);
386
387         leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
388         leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
389         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
390         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
391     }
392
393     private void forceElectionOnFollower1() {
394         // Force follower1 to start an election. follower2 should grant the vote.
395
396         testLog.info("Forcing election on {}", follower1Id);
397
398         follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
399
400         expectFirstMatching(follower1NotifierActor, RoleChanged.class,
401                 rc -> rc.getNewRole().equals(RaftState.Leader.name()));
402
403         currentTerm = follower1Context.getTermInformation().getCurrentTerm();
404     }
405
406     private void isolateLeader() {
407         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
408
409         testLog.info("Isolating the leader");
410
411         leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
412         leaderActor.underlyingActor().startDropMessages(RequestVote.class);
413
414         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
415         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
416
417         clearMessages(follower1CollectorActor);
418         clearMessages(follower1NotifierActor);
419         clearMessages(leaderNotifierActor);
420     }
421
422     private void createRaftActors() {
423         testLog.info("createRaftActors starting");
424
425         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
426                 factory.generateActorId(follower1Id + "-notifier"));
427
428         DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
429         followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
430         followerConfigParams.setElectionTimeoutFactor(1000);
431         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
432                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
433                 config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
434
435         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
436                 follower1Id, testActorPath(follower1Id)), followerConfigParams);
437
438         peerAddresses = ImmutableMap.<String, String>builder().
439                 put(follower1Id, follower1Actor.path().toString()).
440                 put(follower2Id, follower2Actor.path().toString()).build();
441
442         leaderConfigParams = newLeaderConfigParams();
443         leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
444
445         leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
446                 factory.generateActorId(leaderId + "-notifier"));
447
448         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
449                 config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
450
451         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
452         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
453         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
454
455         leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
456         waitUntilLeader(leaderActor);
457
458         expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
459
460
461         clearMessages(leaderCollectorActor);
462         clearMessages(follower1CollectorActor);
463         clearMessages(follower2CollectorActor);
464
465         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
466         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
467
468         follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
469         follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
470
471         testLog.info("createRaftActors ending");
472     }
473 }