5179ddc4816ebfa19ca26800a7b84e1444cafde6
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import scala.concurrent.duration.FiniteDuration;
57
58 public class LeaderTest extends AbstractLeaderTest {
59
60     static final String FOLLOWER_ID = "follower";
61     public static final String LEADER_ID = "leader";
62
63     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
64             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
65
66     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
67             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
68
69     private Leader leader;
70     private final short payloadVersion = 5;
71
72     @Override
73     @After
74     public void tearDown() throws Exception {
75         if(leader != null) {
76             leader.close();
77         }
78
79         super.tearDown();
80     }
81
82     @Test
83     public void testHandleMessageForUnknownMessage() throws Exception {
84         logStart("testHandleMessageForUnknownMessage");
85
86         leader = new Leader(createActorContext());
87
88         // handle message should return the Leader state when it receives an
89         // unknown message
90         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
91         Assert.assertTrue(behavior instanceof Leader);
92     }
93
94     @Test
95     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
96         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
97
98         MockRaftActorContext actorContext = createActorContextWithFollower();
99         short payloadVersion = (short)5;
100         actorContext.setPayloadVersion(payloadVersion);
101
102         long term = 1;
103         actorContext.getTermInformation().update(term, "");
104
105         leader = new Leader(actorContext);
106
107         // Leader should send an immediate heartbeat with no entries as follower is inactive.
108         long lastIndex = actorContext.getReplicatedLog().lastIndex();
109         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
110         assertEquals("getTerm", term, appendEntries.getTerm());
111         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
112         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
113         assertEquals("Entries size", 0, appendEntries.getEntries().size());
114         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
115
116         // The follower would normally reply - simulate that explicitly here.
117         leader.handleMessage(followerActor, new AppendEntriesReply(
118                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
119         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
120
121         followerActor.underlyingActor().clear();
122
123         // Sleep for the heartbeat interval so AppendEntries is sent.
124         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
125                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
126
127         leader.handleMessage(leaderActor, new SendHeartBeat());
128
129         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
130         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
131         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
132         assertEquals("Entries size", 1, appendEntries.getEntries().size());
133         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
134         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
135         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
136     }
137
138
139     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
140         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
141         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
142                 1, index, payload);
143         actorContext.getReplicatedLog().append(newEntry);
144         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
145     }
146
147     @Test
148     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
149         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
150
151         MockRaftActorContext actorContext = createActorContextWithFollower();
152
153         long term = 1;
154         actorContext.getTermInformation().update(term, "");
155
156         leader = new Leader(actorContext);
157
158         // Leader will send an immediate heartbeat - ignore it.
159         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
160
161         // The follower would normally reply - simulate that explicitly here.
162         long lastIndex = actorContext.getReplicatedLog().lastIndex();
163         leader.handleMessage(followerActor, new AppendEntriesReply(
164                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
165         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
166
167         followerActor.underlyingActor().clear();
168
169         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
170
171         // State should not change
172         assertTrue(raftBehavior instanceof Leader);
173
174         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
176         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
177         assertEquals("Entries size", 1, appendEntries.getEntries().size());
178         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
179         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
180         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
181         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
182     }
183
184     @Test
185     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
186         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
187
188         MockRaftActorContext actorContext = createActorContextWithFollower();
189         actorContext.setRaftPolicy(createRaftPolicy(true, true));
190
191         long term = 1;
192         actorContext.getTermInformation().update(term, "");
193
194         leader = new Leader(actorContext);
195
196         // Leader will send an immediate heartbeat - ignore it.
197         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
198
199         // The follower would normally reply - simulate that explicitly here.
200         long lastIndex = actorContext.getReplicatedLog().lastIndex();
201         leader.handleMessage(followerActor, new AppendEntriesReply(
202                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
203         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
204
205         followerActor.underlyingActor().clear();
206
207         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
208
209         // State should not change
210         assertTrue(raftBehavior instanceof Leader);
211
212         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
213         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
214         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
215         assertEquals("Entries size", 1, appendEntries.getEntries().size());
216         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
217         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
218         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
219         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
220     }
221
222     @Test
223     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
224         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
225
226         MockRaftActorContext actorContext = createActorContextWithFollower();
227         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
228             @Override
229             public FiniteDuration getHeartBeatInterval() {
230                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
231             }
232         });
233
234         long term = 1;
235         actorContext.getTermInformation().update(term, "");
236
237         leader = new Leader(actorContext);
238
239         // Leader will send an immediate heartbeat - ignore it.
240         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
241
242         // The follower would normally reply - simulate that explicitly here.
243         long lastIndex = actorContext.getReplicatedLog().lastIndex();
244         leader.handleMessage(followerActor, new AppendEntriesReply(
245                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
246         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
247
248         followerActor.underlyingActor().clear();
249
250         for(int i=0;i<5;i++) {
251             sendReplicate(actorContext, lastIndex+i+1);
252         }
253
254         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
255         // We expect only 1 message to be sent because of two reasons,
256         // - an append entries reply was not received
257         // - the heartbeat interval has not expired
258         // In this scenario if multiple messages are sent they would likely be duplicates
259         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
260     }
261
262     @Test
263     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
264         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
265
266         MockRaftActorContext actorContext = createActorContextWithFollower();
267         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268             @Override
269             public FiniteDuration getHeartBeatInterval() {
270                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
271             }
272         });
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         for(int i=0;i<3;i++) {
291             sendReplicate(actorContext, lastIndex+i+1);
292             leader.handleMessage(followerActor, new AppendEntriesReply(
293                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
294
295         }
296
297         for(int i=3;i<5;i++) {
298             sendReplicate(actorContext, lastIndex + i + 1);
299         }
300
301         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
302         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
303         // get sent to the follower - but not the 5th
304         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
305
306         for(int i=0;i<4;i++) {
307             long expected = allMessages.get(i).getEntries().get(0).getIndex();
308             assertEquals(expected, i+2);
309         }
310     }
311
312     @Test
313     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
314         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
315
316         MockRaftActorContext actorContext = createActorContextWithFollower();
317         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
318             @Override
319             public FiniteDuration getHeartBeatInterval() {
320                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
321             }
322         });
323
324         long term = 1;
325         actorContext.getTermInformation().update(term, "");
326
327         leader = new Leader(actorContext);
328
329         // Leader will send an immediate heartbeat - ignore it.
330         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
331
332         // The follower would normally reply - simulate that explicitly here.
333         long lastIndex = actorContext.getReplicatedLog().lastIndex();
334         leader.handleMessage(followerActor, new AppendEntriesReply(
335                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
336         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
337
338         followerActor.underlyingActor().clear();
339
340         sendReplicate(actorContext, lastIndex+1);
341
342         // Wait slightly longer than heartbeat duration
343         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
344
345         leader.handleMessage(leaderActor, new SendHeartBeat());
346
347         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
348         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
349
350         assertEquals(1, allMessages.get(0).getEntries().size());
351         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
352         assertEquals(1, allMessages.get(1).getEntries().size());
353         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
354
355     }
356
357     @Test
358     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
359         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
360
361         MockRaftActorContext actorContext = createActorContextWithFollower();
362         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
363             @Override
364             public FiniteDuration getHeartBeatInterval() {
365                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
366             }
367         });
368
369         long term = 1;
370         actorContext.getTermInformation().update(term, "");
371
372         leader = new Leader(actorContext);
373
374         // Leader will send an immediate heartbeat - ignore it.
375         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
376
377         // The follower would normally reply - simulate that explicitly here.
378         long lastIndex = actorContext.getReplicatedLog().lastIndex();
379         leader.handleMessage(followerActor, new AppendEntriesReply(
380                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
381         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
382
383         followerActor.underlyingActor().clear();
384
385         for(int i=0;i<3;i++) {
386             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
387             leader.handleMessage(leaderActor, new SendHeartBeat());
388         }
389
390         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
391         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
392     }
393
394     @Test
395     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
396         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
397
398         MockRaftActorContext actorContext = createActorContextWithFollower();
399         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
400             @Override
401             public FiniteDuration getHeartBeatInterval() {
402                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
403             }
404         });
405
406         long term = 1;
407         actorContext.getTermInformation().update(term, "");
408
409         leader = new Leader(actorContext);
410
411         // Leader will send an immediate heartbeat - ignore it.
412         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
413
414         // The follower would normally reply - simulate that explicitly here.
415         long lastIndex = actorContext.getReplicatedLog().lastIndex();
416         leader.handleMessage(followerActor, new AppendEntriesReply(
417                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
418         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
419
420         followerActor.underlyingActor().clear();
421
422         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
423         leader.handleMessage(leaderActor, new SendHeartBeat());
424         sendReplicate(actorContext, lastIndex+1);
425
426         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
427         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
428
429         assertEquals(0, allMessages.get(0).getEntries().size());
430         assertEquals(1, allMessages.get(1).getEntries().size());
431     }
432
433
434     @Test
435     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
436         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
437
438         MockRaftActorContext actorContext = createActorContext();
439
440         leader = new Leader(actorContext);
441
442         actorContext.setLastApplied(0);
443
444         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
445         long term = actorContext.getTermInformation().getCurrentTerm();
446         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
447                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
448
449         actorContext.getReplicatedLog().append(newEntry);
450
451         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
452                 new Replicate(leaderActor, "state-id", newEntry));
453
454         // State should not change
455         assertTrue(raftBehavior instanceof Leader);
456
457         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
458
459         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
460         // one since lastApplied state is 0.
461         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
462                 leaderActor, ApplyState.class);
463         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
464
465         for(int i = 0; i <= newLogIndex - 1; i++ ) {
466             ApplyState applyState = applyStateList.get(i);
467             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
468             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
469         }
470
471         ApplyState last = applyStateList.get((int) newLogIndex - 1);
472         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
473         assertEquals("getIdentifier", "state-id", last.getIdentifier());
474     }
475
476     @Test
477     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
478         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
479
480         MockRaftActorContext actorContext = createActorContextWithFollower();
481
482         Map<String, String> leadersSnapshot = new HashMap<>();
483         leadersSnapshot.put("1", "A");
484         leadersSnapshot.put("2", "B");
485         leadersSnapshot.put("3", "C");
486
487         //clears leaders log
488         actorContext.getReplicatedLog().removeFrom(0);
489
490         final int commitIndex = 3;
491         final int snapshotIndex = 2;
492         final int newEntryIndex = 4;
493         final int snapshotTerm = 1;
494         final int currentTerm = 2;
495
496         // set the snapshot variables in replicatedlog
497         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
498         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
499         actorContext.setCommitIndex(commitIndex);
500         //set follower timeout to 2 mins, helps during debugging
501         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
502
503         leader = new Leader(actorContext);
504
505         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
506         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
507
508         // new entry
509         ReplicatedLogImplEntry entry =
510                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
511                         new MockRaftActorContext.MockPayload("D"));
512
513         //update follower timestamp
514         leader.markFollowerActive(FOLLOWER_ID);
515
516         ByteString bs = toByteString(leadersSnapshot);
517         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
518                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
519         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
520         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
521
522         //send first chunk and no InstallSnapshotReply received yet
523         fts.getNextChunk();
524         fts.incrementChunkIndex();
525
526         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
527                 TimeUnit.MILLISECONDS);
528
529         leader.handleMessage(leaderActor, new SendHeartBeat());
530
531         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
532
533         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
534
535         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
536
537         //InstallSnapshotReply received
538         fts.markSendStatus(true);
539
540         leader.handleMessage(leaderActor, new SendHeartBeat());
541
542         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
543
544         assertEquals(commitIndex, is.getLastIncludedIndex());
545     }
546
547     @Test
548     public void testSendAppendEntriesSnapshotScenario() throws Exception {
549         logStart("testSendAppendEntriesSnapshotScenario");
550
551         MockRaftActorContext actorContext = createActorContextWithFollower();
552
553         Map<String, String> leadersSnapshot = new HashMap<>();
554         leadersSnapshot.put("1", "A");
555         leadersSnapshot.put("2", "B");
556         leadersSnapshot.put("3", "C");
557
558         //clears leaders log
559         actorContext.getReplicatedLog().removeFrom(0);
560
561         final int followersLastIndex = 2;
562         final int snapshotIndex = 3;
563         final int newEntryIndex = 4;
564         final int snapshotTerm = 1;
565         final int currentTerm = 2;
566
567         // set the snapshot variables in replicatedlog
568         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570         actorContext.setCommitIndex(followersLastIndex);
571
572         leader = new Leader(actorContext);
573
574         // Leader will send an immediate heartbeat - ignore it.
575         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
576
577         // new entry
578         ReplicatedLogImplEntry entry =
579                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580                         new MockRaftActorContext.MockPayload("D"));
581
582         actorContext.getReplicatedLog().append(entry);
583
584         //update follower timestamp
585         leader.markFollowerActive(FOLLOWER_ID);
586
587         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
588         RaftActorBehavior raftBehavior = leader.handleMessage(
589                 leaderActor, new Replicate(null, "state-id", entry));
590
591         assertTrue(raftBehavior instanceof Leader);
592
593         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
594     }
595
596     @Test
597     public void testInitiateInstallSnapshot() throws Exception {
598         logStart("testInitiateInstallSnapshot");
599
600         MockRaftActorContext actorContext = createActorContextWithFollower();
601
602         Map<String, String> leadersSnapshot = new HashMap<>();
603         leadersSnapshot.put("1", "A");
604         leadersSnapshot.put("2", "B");
605         leadersSnapshot.put("3", "C");
606
607         //clears leaders log
608         actorContext.getReplicatedLog().removeFrom(0);
609
610         final int followersLastIndex = 2;
611         final int snapshotIndex = 3;
612         final int newEntryIndex = 4;
613         final int snapshotTerm = 1;
614         final int currentTerm = 2;
615
616         // set the snapshot variables in replicatedlog
617         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
618         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
619         actorContext.setLastApplied(3);
620         actorContext.setCommitIndex(followersLastIndex);
621
622         leader = new Leader(actorContext);
623
624         // Leader will send an immediate heartbeat - ignore it.
625         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
626
627         // set the snapshot as absent and check if capture-snapshot is invoked.
628         leader.setSnapshot(null);
629
630         // new entry
631         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
632                 new MockRaftActorContext.MockPayload("D"));
633
634         actorContext.getReplicatedLog().append(entry);
635
636         //update follower timestamp
637         leader.markFollowerActive(FOLLOWER_ID);
638
639         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
640
641         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
642
643         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
644
645         assertTrue(cs.isInstallSnapshotInitiated());
646         assertEquals(3, cs.getLastAppliedIndex());
647         assertEquals(1, cs.getLastAppliedTerm());
648         assertEquals(4, cs.getLastIndex());
649         assertEquals(2, cs.getLastTerm());
650
651         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
652         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
653
654         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
655     }
656
657     @Test
658     public void testInstallSnapshot() throws Exception {
659         logStart("testInstallSnapshot");
660
661         MockRaftActorContext actorContext = createActorContextWithFollower();
662
663         Map<String, String> leadersSnapshot = new HashMap<>();
664         leadersSnapshot.put("1", "A");
665         leadersSnapshot.put("2", "B");
666         leadersSnapshot.put("3", "C");
667
668         //clears leaders log
669         actorContext.getReplicatedLog().removeFrom(0);
670
671         final int lastAppliedIndex = 3;
672         final int snapshotIndex = 2;
673         final int snapshotTerm = 1;
674         final int currentTerm = 2;
675
676         // set the snapshot variables in replicatedlog
677         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
678         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
679         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
680         actorContext.setCommitIndex(lastAppliedIndex);
681         actorContext.setLastApplied(lastAppliedIndex);
682
683         leader = new Leader(actorContext);
684
685         // Initial heartbeat.
686         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
687
688         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
689         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
690
691         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
692                 Collections.<ReplicatedLogEntry>emptyList(),
693                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
694
695         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
696
697         assertTrue(raftBehavior instanceof Leader);
698
699         // check if installsnapshot gets called with the correct values.
700
701         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
702
703         assertNotNull(installSnapshot.getData());
704         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
705         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
706
707         assertEquals(currentTerm, installSnapshot.getTerm());
708     }
709
710     @Test
711     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
712         logStart("testHandleInstallSnapshotReplyLastChunk");
713
714         MockRaftActorContext actorContext = createActorContextWithFollower();
715
716         final int commitIndex = 3;
717         final int snapshotIndex = 2;
718         final int snapshotTerm = 1;
719         final int currentTerm = 2;
720
721         actorContext.setCommitIndex(commitIndex);
722
723         leader = new Leader(actorContext);
724
725         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
726         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
727
728         // Ignore initial heartbeat.
729         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
730
731         Map<String, String> leadersSnapshot = new HashMap<>();
732         leadersSnapshot.put("1", "A");
733         leadersSnapshot.put("2", "B");
734         leadersSnapshot.put("3", "C");
735
736         // set the snapshot variables in replicatedlog
737
738         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
739         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
740         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
741
742         ByteString bs = toByteString(leadersSnapshot);
743         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
744                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
745         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
746         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
747         while(!fts.isLastChunk(fts.getChunkIndex())) {
748             fts.getNextChunk();
749             fts.incrementChunkIndex();
750         }
751
752         //clears leaders log
753         actorContext.getReplicatedLog().removeFrom(0);
754
755         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
756                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
757
758         assertTrue(raftBehavior instanceof Leader);
759
760         assertEquals(0, leader.followerSnapshotSize());
761         assertEquals(1, leader.followerLogSize());
762         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
763         assertNotNull(fli);
764         assertEquals(commitIndex, fli.getMatchIndex());
765         assertEquals(commitIndex + 1, fli.getNextIndex());
766     }
767
768     @Test
769     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
770         logStart("testSendSnapshotfromInstallSnapshotReply");
771
772         MockRaftActorContext actorContext = createActorContextWithFollower();
773
774         final int commitIndex = 3;
775         final int snapshotIndex = 2;
776         final int snapshotTerm = 1;
777         final int currentTerm = 2;
778
779         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
780             @Override
781             public int getSnapshotChunkSize() {
782                 return 50;
783             }
784         };
785         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
786         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
787
788         actorContext.setConfigParams(configParams);
789         actorContext.setCommitIndex(commitIndex);
790
791         leader = new Leader(actorContext);
792
793         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
794         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
795
796         Map<String, String> leadersSnapshot = new HashMap<>();
797         leadersSnapshot.put("1", "A");
798         leadersSnapshot.put("2", "B");
799         leadersSnapshot.put("3", "C");
800
801         // set the snapshot variables in replicatedlog
802         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805
806         ByteString bs = toByteString(leadersSnapshot);
807         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
808                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
809         leader.setSnapshot(snapshot);
810
811         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
812
813         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
814
815         assertEquals(1, installSnapshot.getChunkIndex());
816         assertEquals(3, installSnapshot.getTotalChunks());
817
818         followerActor.underlyingActor().clear();
819         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
820                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
821
822         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
823
824         assertEquals(2, installSnapshot.getChunkIndex());
825         assertEquals(3, installSnapshot.getTotalChunks());
826
827         followerActor.underlyingActor().clear();
828         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
829                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
830
831         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
832
833         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
834         followerActor.underlyingActor().clear();
835         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
836                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
837
838         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
839
840         Assert.assertNull(installSnapshot);
841     }
842
843
844     @Test
845     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
846         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
847
848         MockRaftActorContext actorContext = createActorContextWithFollower();
849
850         final int commitIndex = 3;
851         final int snapshotIndex = 2;
852         final int snapshotTerm = 1;
853         final int currentTerm = 2;
854
855         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
856             @Override
857             public int getSnapshotChunkSize() {
858                 return 50;
859             }
860         });
861
862         actorContext.setCommitIndex(commitIndex);
863
864         leader = new Leader(actorContext);
865
866         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
867         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
868
869         Map<String, String> leadersSnapshot = new HashMap<>();
870         leadersSnapshot.put("1", "A");
871         leadersSnapshot.put("2", "B");
872         leadersSnapshot.put("3", "C");
873
874         // set the snapshot variables in replicatedlog
875         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
876         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
877         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
878
879         ByteString bs = toByteString(leadersSnapshot);
880         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
881                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
882         leader.setSnapshot(snapshot);
883
884         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
885         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
886
887         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
888
889         assertEquals(1, installSnapshot.getChunkIndex());
890         assertEquals(3, installSnapshot.getTotalChunks());
891
892         followerActor.underlyingActor().clear();
893
894         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
895                 FOLLOWER_ID, -1, false));
896
897         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
898                 TimeUnit.MILLISECONDS);
899
900         leader.handleMessage(leaderActor, new SendHeartBeat());
901
902         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
903
904         assertEquals(1, installSnapshot.getChunkIndex());
905         assertEquals(3, installSnapshot.getTotalChunks());
906     }
907
908     @Test
909     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
910         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
911
912         MockRaftActorContext actorContext = createActorContextWithFollower();
913
914         final int commitIndex = 3;
915         final int snapshotIndex = 2;
916         final int snapshotTerm = 1;
917         final int currentTerm = 2;
918
919         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
920             @Override
921             public int getSnapshotChunkSize() {
922                 return 50;
923             }
924         });
925
926         actorContext.setCommitIndex(commitIndex);
927
928         leader = new Leader(actorContext);
929
930         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
931         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
932
933         Map<String, String> leadersSnapshot = new HashMap<>();
934         leadersSnapshot.put("1", "A");
935         leadersSnapshot.put("2", "B");
936         leadersSnapshot.put("3", "C");
937
938         // set the snapshot variables in replicatedlog
939         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
940         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
941         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
942
943         ByteString bs = toByteString(leadersSnapshot);
944         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
945                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
946         leader.setSnapshot(snapshot);
947
948         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
949
950         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
951
952         assertEquals(1, installSnapshot.getChunkIndex());
953         assertEquals(3, installSnapshot.getTotalChunks());
954         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
955
956         int hashCode = installSnapshot.getData().hashCode();
957
958         followerActor.underlyingActor().clear();
959
960         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
961                 FOLLOWER_ID, 1, true));
962
963         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
964
965         assertEquals(2, installSnapshot.getChunkIndex());
966         assertEquals(3, installSnapshot.getTotalChunks());
967         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
968     }
969
970     @Test
971     public void testFollowerToSnapshotLogic() {
972         logStart("testFollowerToSnapshotLogic");
973
974         MockRaftActorContext actorContext = createActorContext();
975
976         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
977             @Override
978             public int getSnapshotChunkSize() {
979                 return 50;
980             }
981         });
982
983         leader = new Leader(actorContext);
984
985         Map<String, String> leadersSnapshot = new HashMap<>();
986         leadersSnapshot.put("1", "A");
987         leadersSnapshot.put("2", "B");
988         leadersSnapshot.put("3", "C");
989
990         ByteString bs = toByteString(leadersSnapshot);
991         byte[] barray = bs.toByteArray();
992
993         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
994         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
995
996         assertEquals(bs.size(), barray.length);
997
998         int chunkIndex=0;
999         for (int i=0; i < barray.length; i = i + 50) {
1000             int j = i + 50;
1001             chunkIndex++;
1002
1003             if (i + 50 > barray.length) {
1004                 j = barray.length;
1005             }
1006
1007             ByteString chunk = fts.getNextChunk();
1008             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1009             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1010
1011             fts.markSendStatus(true);
1012             if (!fts.isLastChunk(chunkIndex)) {
1013                 fts.incrementChunkIndex();
1014             }
1015         }
1016
1017         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1018     }
1019
1020     @Override protected RaftActorBehavior createBehavior(
1021         RaftActorContext actorContext) {
1022         return new Leader(actorContext);
1023     }
1024
1025     @Override
1026     protected MockRaftActorContext createActorContext() {
1027         return createActorContext(leaderActor);
1028     }
1029
1030     @Override
1031     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1032         return createActorContext(LEADER_ID, actorRef);
1033     }
1034
1035     private MockRaftActorContext createActorContextWithFollower() {
1036         MockRaftActorContext actorContext = createActorContext();
1037         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1038                 followerActor.path().toString()).build());
1039         return actorContext;
1040     }
1041
1042     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1043         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1045         configParams.setElectionTimeoutFactor(100000);
1046         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1047         context.setConfigParams(configParams);
1048         context.setPayloadVersion(payloadVersion);
1049         return context;
1050     }
1051
1052     private MockRaftActorContext createFollowerActorContextWithLeader() {
1053         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1054         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1055         followerConfig.setElectionTimeoutFactor(10000);
1056         followerActorContext.setConfigParams(followerConfig);
1057         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1058         return followerActorContext;
1059     }
1060
1061     @Test
1062     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1063         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1064
1065         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1066
1067         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1068
1069         Follower follower = new Follower(followerActorContext);
1070         followerActor.underlyingActor().setBehavior(follower);
1071
1072         Map<String, String> peerAddresses = new HashMap<>();
1073         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1074
1075         leaderActorContext.setPeerAddresses(peerAddresses);
1076
1077         leaderActorContext.getReplicatedLog().removeFrom(0);
1078
1079         //create 3 entries
1080         leaderActorContext.setReplicatedLog(
1081                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1082
1083         leaderActorContext.setCommitIndex(1);
1084
1085         followerActorContext.getReplicatedLog().removeFrom(0);
1086
1087         // follower too has the exact same log entries and has the same commit index
1088         followerActorContext.setReplicatedLog(
1089                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1090
1091         followerActorContext.setCommitIndex(1);
1092
1093         leader = new Leader(leaderActorContext);
1094
1095         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1096
1097         assertEquals(1, appendEntries.getLeaderCommit());
1098         assertEquals(0, appendEntries.getEntries().size());
1099         assertEquals(0, appendEntries.getPrevLogIndex());
1100
1101         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1102                 leaderActor, AppendEntriesReply.class);
1103
1104         assertEquals(2, appendEntriesReply.getLogLastIndex());
1105         assertEquals(1, appendEntriesReply.getLogLastTerm());
1106
1107         // follower returns its next index
1108         assertEquals(2, appendEntriesReply.getLogLastIndex());
1109         assertEquals(1, appendEntriesReply.getLogLastTerm());
1110
1111         follower.close();
1112     }
1113
1114     @Test
1115     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1116         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1117
1118         MockRaftActorContext leaderActorContext = createActorContext();
1119
1120         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1121         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1122
1123         Follower follower = new Follower(followerActorContext);
1124         followerActor.underlyingActor().setBehavior(follower);
1125
1126         Map<String, String> leaderPeerAddresses = new HashMap<>();
1127         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1128
1129         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1130
1131         leaderActorContext.getReplicatedLog().removeFrom(0);
1132
1133         leaderActorContext.setReplicatedLog(
1134                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1135
1136         leaderActorContext.setCommitIndex(1);
1137
1138         followerActorContext.getReplicatedLog().removeFrom(0);
1139
1140         followerActorContext.setReplicatedLog(
1141                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1142
1143         // follower has the same log entries but its commit index > leaders commit index
1144         followerActorContext.setCommitIndex(2);
1145
1146         leader = new Leader(leaderActorContext);
1147
1148         // Initial heartbeat
1149         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1150
1151         assertEquals(1, appendEntries.getLeaderCommit());
1152         assertEquals(0, appendEntries.getEntries().size());
1153         assertEquals(0, appendEntries.getPrevLogIndex());
1154
1155         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1156                 leaderActor, AppendEntriesReply.class);
1157
1158         assertEquals(2, appendEntriesReply.getLogLastIndex());
1159         assertEquals(1, appendEntriesReply.getLogLastTerm());
1160
1161         leaderActor.underlyingActor().setBehavior(follower);
1162         leader.handleMessage(followerActor, appendEntriesReply);
1163
1164         leaderActor.underlyingActor().clear();
1165         followerActor.underlyingActor().clear();
1166
1167         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1168                 TimeUnit.MILLISECONDS);
1169
1170         leader.handleMessage(leaderActor, new SendHeartBeat());
1171
1172         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1173
1174         assertEquals(2, appendEntries.getLeaderCommit());
1175         assertEquals(0, appendEntries.getEntries().size());
1176         assertEquals(2, appendEntries.getPrevLogIndex());
1177
1178         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1179
1180         assertEquals(2, appendEntriesReply.getLogLastIndex());
1181         assertEquals(1, appendEntriesReply.getLogLastTerm());
1182
1183         assertEquals(2, followerActorContext.getCommitIndex());
1184
1185         follower.close();
1186     }
1187
1188     @Test
1189     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1190         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1191
1192         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1193         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1194                 new FiniteDuration(1000, TimeUnit.SECONDS));
1195
1196         leaderActorContext.setReplicatedLog(
1197                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1198         long leaderCommitIndex = 2;
1199         leaderActorContext.setCommitIndex(leaderCommitIndex);
1200         leaderActorContext.setLastApplied(leaderCommitIndex);
1201
1202         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1203         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1204
1205         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1206
1207         followerActorContext.setReplicatedLog(
1208                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1209         followerActorContext.setCommitIndex(0);
1210         followerActorContext.setLastApplied(0);
1211
1212         Follower follower = new Follower(followerActorContext);
1213         followerActor.underlyingActor().setBehavior(follower);
1214
1215         leader = new Leader(leaderActorContext);
1216
1217         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1218         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1219
1220         MessageCollectorActor.clearMessages(followerActor);
1221         MessageCollectorActor.clearMessages(leaderActor);
1222
1223         // Verify initial AppendEntries sent with the leader's current commit index.
1224         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1225         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1226         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1227
1228         leaderActor.underlyingActor().setBehavior(leader);
1229
1230         leader.handleMessage(followerActor, appendEntriesReply);
1231
1232         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1233         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1234
1235         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1236         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1237         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1238
1239         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1240         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1241                 appendEntries.getEntries().get(0).getData());
1242         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1243         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1244                 appendEntries.getEntries().get(1).getData());
1245
1246         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1247         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1248
1249         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1250
1251         ApplyState applyState = applyStateList.get(0);
1252         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1253         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1254         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1255                 applyState.getReplicatedLogEntry().getData());
1256
1257         applyState = applyStateList.get(1);
1258         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1259         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1260         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1261                 applyState.getReplicatedLogEntry().getData());
1262
1263         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1264         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1265     }
1266
1267     @Test
1268     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1269         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1270
1271         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1272         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1273                 new FiniteDuration(1000, TimeUnit.SECONDS));
1274
1275         leaderActorContext.setReplicatedLog(
1276                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1277         long leaderCommitIndex = 1;
1278         leaderActorContext.setCommitIndex(leaderCommitIndex);
1279         leaderActorContext.setLastApplied(leaderCommitIndex);
1280
1281         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1282         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1283
1284         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1285
1286         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1287         followerActorContext.setCommitIndex(-1);
1288         followerActorContext.setLastApplied(-1);
1289
1290         Follower follower = new Follower(followerActorContext);
1291         followerActor.underlyingActor().setBehavior(follower);
1292
1293         leader = new Leader(leaderActorContext);
1294
1295         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1296         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1297
1298         MessageCollectorActor.clearMessages(followerActor);
1299         MessageCollectorActor.clearMessages(leaderActor);
1300
1301         // Verify initial AppendEntries sent with the leader's current commit index.
1302         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1303         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1304         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1305
1306         leaderActor.underlyingActor().setBehavior(leader);
1307
1308         leader.handleMessage(followerActor, appendEntriesReply);
1309
1310         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1311         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1312
1313         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1314         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1315         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1316
1317         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1318         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1319                 appendEntries.getEntries().get(0).getData());
1320         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1321         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1322                 appendEntries.getEntries().get(1).getData());
1323
1324         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1325         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1326
1327         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1328
1329         ApplyState applyState = applyStateList.get(0);
1330         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1331         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1332         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1333                 applyState.getReplicatedLogEntry().getData());
1334
1335         applyState = applyStateList.get(1);
1336         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1337         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1338         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1339                 applyState.getReplicatedLogEntry().getData());
1340
1341         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1342         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1343     }
1344
1345     @Test
1346     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1347         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1348
1349         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1350         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1351                 new FiniteDuration(1000, TimeUnit.SECONDS));
1352
1353         leaderActorContext.setReplicatedLog(
1354                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1355         long leaderCommitIndex = 1;
1356         leaderActorContext.setCommitIndex(leaderCommitIndex);
1357         leaderActorContext.setLastApplied(leaderCommitIndex);
1358
1359         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1360         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1361
1362         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1363
1364         followerActorContext.setReplicatedLog(
1365                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1366         followerActorContext.setCommitIndex(-1);
1367         followerActorContext.setLastApplied(-1);
1368
1369         Follower follower = new Follower(followerActorContext);
1370         followerActor.underlyingActor().setBehavior(follower);
1371
1372         leader = new Leader(leaderActorContext);
1373
1374         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1375         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1376
1377         MessageCollectorActor.clearMessages(followerActor);
1378         MessageCollectorActor.clearMessages(leaderActor);
1379
1380         // Verify initial AppendEntries sent with the leader's current commit index.
1381         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1382         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1383         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1384
1385         leaderActor.underlyingActor().setBehavior(leader);
1386
1387         leader.handleMessage(followerActor, appendEntriesReply);
1388
1389         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1390         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391
1392         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1393         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1394         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1395
1396         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1397         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1398         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1399                 appendEntries.getEntries().get(0).getData());
1400         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1401         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1402         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1403                 appendEntries.getEntries().get(1).getData());
1404
1405         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1406         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1407
1408         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1409
1410         ApplyState applyState = applyStateList.get(0);
1411         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1412         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1413         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1414                 applyState.getReplicatedLogEntry().getData());
1415
1416         applyState = applyStateList.get(1);
1417         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1418         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1419         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1420                 applyState.getReplicatedLogEntry().getData());
1421
1422         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1423         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1424         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1425     }
1426
1427     @Test
1428     public void testHandleAppendEntriesReplySuccess() throws Exception {
1429         logStart("testHandleAppendEntriesReplySuccess");
1430
1431         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1432
1433         leaderActorContext.setReplicatedLog(
1434                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1435
1436         leaderActorContext.setCommitIndex(1);
1437         leaderActorContext.setLastApplied(1);
1438         leaderActorContext.getTermInformation().update(1, "leader");
1439
1440         leader = new Leader(leaderActorContext);
1441
1442         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1443
1444         short payloadVersion = 5;
1445         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1446
1447         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1448
1449         assertEquals(RaftState.Leader, raftActorBehavior.state());
1450
1451         assertEquals(2, leaderActorContext.getCommitIndex());
1452
1453         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1454                 leaderActor, ApplyJournalEntries.class);
1455
1456         assertEquals(2, leaderActorContext.getLastApplied());
1457
1458         assertEquals(2, applyJournalEntries.getToIndex());
1459
1460         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1461                 ApplyState.class);
1462
1463         assertEquals(1,applyStateList.size());
1464
1465         ApplyState applyState = applyStateList.get(0);
1466
1467         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1468
1469         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1470         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1471     }
1472
1473     @Test
1474     public void testHandleAppendEntriesReplyUnknownFollower(){
1475         logStart("testHandleAppendEntriesReplyUnknownFollower");
1476
1477         MockRaftActorContext leaderActorContext = createActorContext();
1478
1479         leader = new Leader(leaderActorContext);
1480
1481         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1482
1483         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1484
1485         assertEquals(RaftState.Leader, raftActorBehavior.state());
1486     }
1487
1488     @Test
1489     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1490         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1491
1492         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1493         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1494                 new FiniteDuration(1000, TimeUnit.SECONDS));
1495         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1496
1497         leaderActorContext.setReplicatedLog(
1498                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1499         long leaderCommitIndex = 3;
1500         leaderActorContext.setCommitIndex(leaderCommitIndex);
1501         leaderActorContext.setLastApplied(leaderCommitIndex);
1502
1503         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1504         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1505         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1506         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1507
1508         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1509
1510         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1511         followerActorContext.setCommitIndex(-1);
1512         followerActorContext.setLastApplied(-1);
1513
1514         Follower follower = new Follower(followerActorContext);
1515         followerActor.underlyingActor().setBehavior(follower);
1516
1517         leader = new Leader(leaderActorContext);
1518
1519         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1520         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1521
1522         MessageCollectorActor.clearMessages(followerActor);
1523         MessageCollectorActor.clearMessages(leaderActor);
1524
1525         // Verify initial AppendEntries sent with the leader's current commit index.
1526         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1527         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1528         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1529
1530         leaderActor.underlyingActor().setBehavior(leader);
1531
1532         leader.handleMessage(followerActor, appendEntriesReply);
1533
1534         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1535         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1536
1537         appendEntries = appendEntriesList.get(0);
1538         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1539         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1540         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1541
1542         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1543         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1544                 appendEntries.getEntries().get(0).getData());
1545         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1546         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1547                 appendEntries.getEntries().get(1).getData());
1548
1549         appendEntries = appendEntriesList.get(1);
1550         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1551         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1552         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1553
1554         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1555         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1556                 appendEntries.getEntries().get(0).getData());
1557         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1558         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1559                 appendEntries.getEntries().get(1).getData());
1560
1561         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1562         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1563
1564         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1565
1566         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1567         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1568     }
1569
1570     @Test
1571     public void testHandleRequestVoteReply(){
1572         logStart("testHandleRequestVoteReply");
1573
1574         MockRaftActorContext leaderActorContext = createActorContext();
1575
1576         leader = new Leader(leaderActorContext);
1577
1578         // Should be a no-op.
1579         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1580                 new RequestVoteReply(1, true));
1581
1582         assertEquals(RaftState.Leader, raftActorBehavior.state());
1583
1584         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1585
1586         assertEquals(RaftState.Leader, raftActorBehavior.state());
1587     }
1588
1589     @Test
1590     public void testIsolatedLeaderCheckNoFollowers() {
1591         logStart("testIsolatedLeaderCheckNoFollowers");
1592
1593         MockRaftActorContext leaderActorContext = createActorContext();
1594
1595         leader = new Leader(leaderActorContext);
1596         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1597         Assert.assertTrue(behavior instanceof Leader);
1598     }
1599
1600     @Test
1601     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1602         logStart("testIsolatedLeaderCheckTwoFollowers");
1603
1604         new JavaTestKit(getSystem()) {{
1605
1606             ActorRef followerActor1 = getTestActor();
1607             ActorRef followerActor2 = getTestActor();
1608
1609             MockRaftActorContext leaderActorContext = createActorContext();
1610
1611             Map<String, String> peerAddresses = new HashMap<>();
1612             peerAddresses.put("follower-1", followerActor1.path().toString());
1613             peerAddresses.put("follower-2", followerActor2.path().toString());
1614
1615             leaderActorContext.setPeerAddresses(peerAddresses);
1616
1617             leader = new Leader(leaderActorContext);
1618
1619             leader.markFollowerActive("follower-1");
1620             leader.markFollowerActive("follower-2");
1621             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1622             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1623                 behavior instanceof Leader);
1624
1625             // kill 1 follower and verify if that got killed
1626             final JavaTestKit probe = new JavaTestKit(getSystem());
1627             probe.watch(followerActor1);
1628             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1629             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1630             assertEquals(termMsg1.getActor(), followerActor1);
1631
1632             leader.markFollowerInActive("follower-1");
1633             leader.markFollowerActive("follower-2");
1634             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1635             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1636                 behavior instanceof Leader);
1637
1638             // kill 2nd follower and leader should change to Isolated leader
1639             followerActor2.tell(PoisonPill.getInstance(), null);
1640             probe.watch(followerActor2);
1641             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1642             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1643             assertEquals(termMsg2.getActor(), followerActor2);
1644
1645             leader.markFollowerInActive("follower-2");
1646             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1647             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1648                 behavior instanceof IsolatedLeader);
1649         }};
1650     }
1651
1652     @Test
1653     public void testLaggingFollowerStarvation() throws Exception {
1654         logStart("testLaggingFollowerStarvation");
1655         new JavaTestKit(getSystem()) {{
1656             String leaderActorId = actorFactory.generateActorId("leader");
1657             String follower1ActorId = actorFactory.generateActorId("follower");
1658             String follower2ActorId = actorFactory.generateActorId("follower");
1659
1660             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1661                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1662             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1663             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1664
1665             MockRaftActorContext leaderActorContext =
1666                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1667
1668             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1669             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1670             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1671
1672             leaderActorContext.setConfigParams(configParams);
1673
1674             leaderActorContext.setReplicatedLog(
1675                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1676
1677             Map<String, String> peerAddresses = new HashMap<>();
1678             peerAddresses.put(follower1ActorId,
1679                     follower1Actor.path().toString());
1680             peerAddresses.put(follower2ActorId,
1681                     follower2Actor.path().toString());
1682
1683             leaderActorContext.setPeerAddresses(peerAddresses);
1684             leaderActorContext.getTermInformation().update(1, leaderActorId);
1685
1686             RaftActorBehavior leader = createBehavior(leaderActorContext);
1687
1688             leaderActor.underlyingActor().setBehavior(leader);
1689
1690             for(int i=1;i<6;i++) {
1691                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1692                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1693                 assertTrue(newBehavior == leader);
1694                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1695             }
1696
1697             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1698             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1699
1700             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1701                     heartbeats.size() > 1);
1702
1703             // Check if follower-2 got AppendEntries during this time and was not starved
1704             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1705
1706             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1707                     appendEntries.size() > 1);
1708
1709         }};
1710     }
1711
1712     @Override
1713     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1714             ActorRef actorRef, RaftRPC rpc) throws Exception {
1715         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1716         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1717     }
1718
1719     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1720
1721         private final long electionTimeOutIntervalMillis;
1722         private final int snapshotChunkSize;
1723
1724         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1725             super();
1726             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1727             this.snapshotChunkSize = snapshotChunkSize;
1728         }
1729
1730         @Override
1731         public FiniteDuration getElectionTimeOutInterval() {
1732             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1733         }
1734
1735         @Override
1736         public int getSnapshotChunkSize() {
1737             return snapshotChunkSize;
1738         }
1739     }
1740 }