Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractLeaderTest {
50
51     static final String FOLLOWER_ID = "follower";
52     public static final String LEADER_ID = "leader";
53
54     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56
57     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59
60     private Leader leader;
61
62     @Override
63     @After
64     public void tearDown() throws Exception {
65         if(leader != null) {
66             leader.close();
67         }
68
69         super.tearDown();
70     }
71
72     @Test
73     public void testHandleMessageForUnknownMessage() throws Exception {
74         logStart("testHandleMessageForUnknownMessage");
75
76         leader = new Leader(createActorContext());
77
78         // handle message should return the Leader state when it receives an
79         // unknown message
80         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
81         Assert.assertTrue(behavior instanceof Leader);
82     }
83
84     @Test
85     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
86         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
87
88         MockRaftActorContext actorContext = createActorContextWithFollower();
89         short payloadVersion = (short)5;
90         actorContext.setPayloadVersion(payloadVersion);
91
92         long term = 1;
93         actorContext.getTermInformation().update(term, "");
94
95         leader = new Leader(actorContext);
96
97         // Leader should send an immediate heartbeat with no entries as follower is inactive.
98         long lastIndex = actorContext.getReplicatedLog().lastIndex();
99         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
100         assertEquals("getTerm", term, appendEntries.getTerm());
101         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103         assertEquals("Entries size", 0, appendEntries.getEntries().size());
104         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
105
106         // The follower would normally reply - simulate that explicitly here.
107         leader.handleMessage(followerActor, new AppendEntriesReply(
108                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
109         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
110
111         followerActor.underlyingActor().clear();
112
113         // Sleep for the heartbeat interval so AppendEntries is sent.
114         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
115                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
116
117         leader.handleMessage(leaderActor, new SendHeartBeat());
118
119         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
121         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
122         assertEquals("Entries size", 1, appendEntries.getEntries().size());
123         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
124         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126     }
127
128
129     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
130         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
131         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
132                 1, index, payload);
133         actorContext.getReplicatedLog().append(newEntry);
134         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
135     }
136
137     @Test
138     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
139         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
140
141         MockRaftActorContext actorContext = createActorContextWithFollower();
142
143         long term = 1;
144         actorContext.getTermInformation().update(term, "");
145
146         leader = new Leader(actorContext);
147
148         // Leader will send an immediate heartbeat - ignore it.
149         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
150
151         // The follower would normally reply - simulate that explicitly here.
152         long lastIndex = actorContext.getReplicatedLog().lastIndex();
153         leader.handleMessage(followerActor, new AppendEntriesReply(
154                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
155         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
156
157         followerActor.underlyingActor().clear();
158
159         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
160
161         // State should not change
162         assertTrue(raftBehavior instanceof Leader);
163
164         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
165         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
166         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
167         assertEquals("Entries size", 1, appendEntries.getEntries().size());
168         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
169         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
170         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
171     }
172
173     @Test
174     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
175         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
176
177         MockRaftActorContext actorContext = createActorContextWithFollower();
178         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
179             @Override
180             public FiniteDuration getHeartBeatInterval() {
181                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
182             }
183         });
184
185         long term = 1;
186         actorContext.getTermInformation().update(term, "");
187
188         leader = new Leader(actorContext);
189
190         // Leader will send an immediate heartbeat - ignore it.
191         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192
193         // The follower would normally reply - simulate that explicitly here.
194         long lastIndex = actorContext.getReplicatedLog().lastIndex();
195         leader.handleMessage(followerActor, new AppendEntriesReply(
196                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
197         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
198
199         followerActor.underlyingActor().clear();
200
201         for(int i=0;i<5;i++) {
202             sendReplicate(actorContext, lastIndex+i+1);
203         }
204
205         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
206         // We expect only 1 message to be sent because of two reasons,
207         // - an append entries reply was not received
208         // - the heartbeat interval has not expired
209         // In this scenario if multiple messages are sent they would likely be duplicates
210         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
211     }
212
213     @Test
214     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
215         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
216
217         MockRaftActorContext actorContext = createActorContextWithFollower();
218         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
219             @Override
220             public FiniteDuration getHeartBeatInterval() {
221                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
222             }
223         });
224
225         long term = 1;
226         actorContext.getTermInformation().update(term, "");
227
228         leader = new Leader(actorContext);
229
230         // Leader will send an immediate heartbeat - ignore it.
231         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232
233         // The follower would normally reply - simulate that explicitly here.
234         long lastIndex = actorContext.getReplicatedLog().lastIndex();
235         leader.handleMessage(followerActor, new AppendEntriesReply(
236                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
237         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
238
239         followerActor.underlyingActor().clear();
240
241         for(int i=0;i<3;i++) {
242             sendReplicate(actorContext, lastIndex+i+1);
243             leader.handleMessage(followerActor, new AppendEntriesReply(
244                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
245
246         }
247
248         for(int i=3;i<5;i++) {
249             sendReplicate(actorContext, lastIndex + i + 1);
250         }
251
252         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
253         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
254         // get sent to the follower - but not the 5th
255         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
256
257         for(int i=0;i<4;i++) {
258             long expected = allMessages.get(i).getEntries().get(0).getIndex();
259             assertEquals(expected, i+2);
260         }
261     }
262
263     @Test
264     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
265         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
266
267         MockRaftActorContext actorContext = createActorContextWithFollower();
268         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
269             @Override
270             public FiniteDuration getHeartBeatInterval() {
271                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
272             }
273         });
274
275         long term = 1;
276         actorContext.getTermInformation().update(term, "");
277
278         leader = new Leader(actorContext);
279
280         // Leader will send an immediate heartbeat - ignore it.
281         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282
283         // The follower would normally reply - simulate that explicitly here.
284         long lastIndex = actorContext.getReplicatedLog().lastIndex();
285         leader.handleMessage(followerActor, new AppendEntriesReply(
286                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
287         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288
289         followerActor.underlyingActor().clear();
290
291         sendReplicate(actorContext, lastIndex+1);
292
293         // Wait slightly longer than heartbeat duration
294         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
295
296         leader.handleMessage(leaderActor, new SendHeartBeat());
297
298         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
299         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
300
301         assertEquals(1, allMessages.get(0).getEntries().size());
302         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
303         assertEquals(1, allMessages.get(1).getEntries().size());
304         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
305
306     }
307
308     @Test
309     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
310         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
311
312         MockRaftActorContext actorContext = createActorContextWithFollower();
313         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
314             @Override
315             public FiniteDuration getHeartBeatInterval() {
316                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
317             }
318         });
319
320         long term = 1;
321         actorContext.getTermInformation().update(term, "");
322
323         leader = new Leader(actorContext);
324
325         // Leader will send an immediate heartbeat - ignore it.
326         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
327
328         // The follower would normally reply - simulate that explicitly here.
329         long lastIndex = actorContext.getReplicatedLog().lastIndex();
330         leader.handleMessage(followerActor, new AppendEntriesReply(
331                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
333
334         followerActor.underlyingActor().clear();
335
336         for(int i=0;i<3;i++) {
337             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
338             leader.handleMessage(leaderActor, new SendHeartBeat());
339         }
340
341         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
342         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
343     }
344
345     @Test
346     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
347         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
348
349         MockRaftActorContext actorContext = createActorContextWithFollower();
350         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
351             @Override
352             public FiniteDuration getHeartBeatInterval() {
353                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
354             }
355         });
356
357         long term = 1;
358         actorContext.getTermInformation().update(term, "");
359
360         leader = new Leader(actorContext);
361
362         // Leader will send an immediate heartbeat - ignore it.
363         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
364
365         // The follower would normally reply - simulate that explicitly here.
366         long lastIndex = actorContext.getReplicatedLog().lastIndex();
367         leader.handleMessage(followerActor, new AppendEntriesReply(
368                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
370
371         followerActor.underlyingActor().clear();
372
373         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
374         leader.handleMessage(leaderActor, new SendHeartBeat());
375         sendReplicate(actorContext, lastIndex+1);
376
377         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
378         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
379
380         assertEquals(0, allMessages.get(0).getEntries().size());
381         assertEquals(1, allMessages.get(1).getEntries().size());
382     }
383
384
385     @Test
386     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
387         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
388
389         MockRaftActorContext actorContext = createActorContext();
390
391         leader = new Leader(actorContext);
392
393         actorContext.setLastApplied(0);
394
395         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
396         long term = actorContext.getTermInformation().getCurrentTerm();
397         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
398                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
399
400         actorContext.getReplicatedLog().append(newEntry);
401
402         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
403                 new Replicate(leaderActor, "state-id", newEntry));
404
405         // State should not change
406         assertTrue(raftBehavior instanceof Leader);
407
408         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
409
410         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
411         // one since lastApplied state is 0.
412         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
413                 leaderActor, ApplyState.class);
414         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
415
416         for(int i = 0; i <= newLogIndex - 1; i++ ) {
417             ApplyState applyState = applyStateList.get(i);
418             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
419             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
420         }
421
422         ApplyState last = applyStateList.get((int) newLogIndex - 1);
423         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
424         assertEquals("getIdentifier", "state-id", last.getIdentifier());
425     }
426
427     @Test
428     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
429         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
430
431         MockRaftActorContext actorContext = createActorContextWithFollower();
432
433         Map<String, String> leadersSnapshot = new HashMap<>();
434         leadersSnapshot.put("1", "A");
435         leadersSnapshot.put("2", "B");
436         leadersSnapshot.put("3", "C");
437
438         //clears leaders log
439         actorContext.getReplicatedLog().removeFrom(0);
440
441         final int followersLastIndex = 2;
442         final int snapshotIndex = 3;
443         final int newEntryIndex = 4;
444         final int snapshotTerm = 1;
445         final int currentTerm = 2;
446
447         // set the snapshot variables in replicatedlog
448         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
449         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
450         actorContext.setCommitIndex(followersLastIndex);
451         //set follower timeout to 2 mins, helps during debugging
452         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
453
454         leader = new Leader(actorContext);
455
456         // new entry
457         ReplicatedLogImplEntry entry =
458                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
459                         new MockRaftActorContext.MockPayload("D"));
460
461         //update follower timestamp
462         leader.markFollowerActive(FOLLOWER_ID);
463
464         ByteString bs = toByteString(leadersSnapshot);
465         leader.setSnapshot(Optional.of(bs));
466         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
467         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
468
469         //send first chunk and no InstallSnapshotReply received yet
470         fts.getNextChunk();
471         fts.incrementChunkIndex();
472
473         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
474                 TimeUnit.MILLISECONDS);
475
476         leader.handleMessage(leaderActor, new SendHeartBeat());
477
478         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
479
480         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
481
482         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
483
484         //InstallSnapshotReply received
485         fts.markSendStatus(true);
486
487         leader.handleMessage(leaderActor, new SendHeartBeat());
488
489         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
490
491         assertEquals(snapshotIndex, is.getLastIncludedIndex());
492     }
493
494     @Test
495     public void testSendAppendEntriesSnapshotScenario() throws Exception {
496         logStart("testSendAppendEntriesSnapshotScenario");
497
498         MockRaftActorContext actorContext = createActorContextWithFollower();
499
500         Map<String, String> leadersSnapshot = new HashMap<>();
501         leadersSnapshot.put("1", "A");
502         leadersSnapshot.put("2", "B");
503         leadersSnapshot.put("3", "C");
504
505         //clears leaders log
506         actorContext.getReplicatedLog().removeFrom(0);
507
508         final int followersLastIndex = 2;
509         final int snapshotIndex = 3;
510         final int newEntryIndex = 4;
511         final int snapshotTerm = 1;
512         final int currentTerm = 2;
513
514         // set the snapshot variables in replicatedlog
515         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
516         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
517         actorContext.setCommitIndex(followersLastIndex);
518
519         leader = new Leader(actorContext);
520
521         // Leader will send an immediate heartbeat - ignore it.
522         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
523
524         // new entry
525         ReplicatedLogImplEntry entry =
526                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
527                         new MockRaftActorContext.MockPayload("D"));
528
529         actorContext.getReplicatedLog().append(entry);
530
531         //update follower timestamp
532         leader.markFollowerActive(FOLLOWER_ID);
533
534         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
535         RaftActorBehavior raftBehavior = leader.handleMessage(
536                 leaderActor, new Replicate(null, "state-id", entry));
537
538         assertTrue(raftBehavior instanceof Leader);
539
540         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
541     }
542
543     @Test
544     public void testInitiateInstallSnapshot() throws Exception {
545         logStart("testInitiateInstallSnapshot");
546
547         MockRaftActorContext actorContext = createActorContextWithFollower();
548
549         Map<String, String> leadersSnapshot = new HashMap<>();
550         leadersSnapshot.put("1", "A");
551         leadersSnapshot.put("2", "B");
552         leadersSnapshot.put("3", "C");
553
554         //clears leaders log
555         actorContext.getReplicatedLog().removeFrom(0);
556
557         final int followersLastIndex = 2;
558         final int snapshotIndex = 3;
559         final int newEntryIndex = 4;
560         final int snapshotTerm = 1;
561         final int currentTerm = 2;
562
563         // set the snapshot variables in replicatedlog
564         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
565         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
566         actorContext.setLastApplied(3);
567         actorContext.setCommitIndex(followersLastIndex);
568
569         leader = new Leader(actorContext);
570
571         // Leader will send an immediate heartbeat - ignore it.
572         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
573
574         // set the snapshot as absent and check if capture-snapshot is invoked.
575         leader.setSnapshot(Optional.<ByteString>absent());
576
577         // new entry
578         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579                 new MockRaftActorContext.MockPayload("D"));
580
581         actorContext.getReplicatedLog().append(entry);
582
583         //update follower timestamp
584         leader.markFollowerActive(FOLLOWER_ID);
585
586         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
587
588         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
589
590         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
591
592         assertTrue(cs.isInstallSnapshotInitiated());
593         assertEquals(3, cs.getLastAppliedIndex());
594         assertEquals(1, cs.getLastAppliedTerm());
595         assertEquals(4, cs.getLastIndex());
596         assertEquals(2, cs.getLastTerm());
597
598         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
599         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
600
601         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
602     }
603
604     @Test
605     public void testInstallSnapshot() throws Exception {
606         logStart("testInstallSnapshot");
607
608         MockRaftActorContext actorContext = createActorContextWithFollower();
609
610         Map<String, String> leadersSnapshot = new HashMap<>();
611         leadersSnapshot.put("1", "A");
612         leadersSnapshot.put("2", "B");
613         leadersSnapshot.put("3", "C");
614
615         //clears leaders log
616         actorContext.getReplicatedLog().removeFrom(0);
617
618         final int followersLastIndex = 2;
619         final int snapshotIndex = 3;
620         final int snapshotTerm = 1;
621         final int currentTerm = 2;
622
623         // set the snapshot variables in replicatedlog
624         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
625         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
626         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
627         actorContext.setCommitIndex(followersLastIndex);
628
629         leader = new Leader(actorContext);
630
631         // Ignore initial heartbeat.
632         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
633
634         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
635                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
636
637         assertTrue(raftBehavior instanceof Leader);
638
639         // check if installsnapshot gets called with the correct values.
640
641         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
642
643         assertNotNull(installSnapshot.getData());
644         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
645         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
646
647         assertEquals(currentTerm, installSnapshot.getTerm());
648     }
649
650     @Test
651     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
652         logStart("testHandleInstallSnapshotReplyLastChunk");
653
654         MockRaftActorContext actorContext = createActorContextWithFollower();
655
656         final int followersLastIndex = 2;
657         final int snapshotIndex = 3;
658         final int snapshotTerm = 1;
659         final int currentTerm = 2;
660
661         actorContext.setCommitIndex(followersLastIndex);
662
663         leader = new Leader(actorContext);
664
665         // Ignore initial heartbeat.
666         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
667
668         Map<String, String> leadersSnapshot = new HashMap<>();
669         leadersSnapshot.put("1", "A");
670         leadersSnapshot.put("2", "B");
671         leadersSnapshot.put("3", "C");
672
673         // set the snapshot variables in replicatedlog
674
675         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
678
679         ByteString bs = toByteString(leadersSnapshot);
680         leader.setSnapshot(Optional.of(bs));
681         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
682         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
683         while(!fts.isLastChunk(fts.getChunkIndex())) {
684             fts.getNextChunk();
685             fts.incrementChunkIndex();
686         }
687
688         //clears leaders log
689         actorContext.getReplicatedLog().removeFrom(0);
690
691         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
692                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
693
694         assertTrue(raftBehavior instanceof Leader);
695
696         assertEquals(0, leader.followerSnapshotSize());
697         assertEquals(1, leader.followerLogSize());
698         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
699         assertNotNull(fli);
700         assertEquals(snapshotIndex, fli.getMatchIndex());
701         assertEquals(snapshotIndex, fli.getMatchIndex());
702         assertEquals(snapshotIndex + 1, fli.getNextIndex());
703     }
704
705     @Test
706     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
707         logStart("testSendSnapshotfromInstallSnapshotReply");
708
709         MockRaftActorContext actorContext = createActorContextWithFollower();
710
711         final int followersLastIndex = 2;
712         final int snapshotIndex = 3;
713         final int snapshotTerm = 1;
714         final int currentTerm = 2;
715
716         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
717             @Override
718             public int getSnapshotChunkSize() {
719                 return 50;
720             }
721         };
722         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
723         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
724
725         actorContext.setConfigParams(configParams);
726         actorContext.setCommitIndex(followersLastIndex);
727
728         leader = new Leader(actorContext);
729
730         Map<String, String> leadersSnapshot = new HashMap<>();
731         leadersSnapshot.put("1", "A");
732         leadersSnapshot.put("2", "B");
733         leadersSnapshot.put("3", "C");
734
735         // set the snapshot variables in replicatedlog
736         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
739
740         ByteString bs = toByteString(leadersSnapshot);
741         leader.setSnapshot(Optional.of(bs));
742
743         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
744
745         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
746
747         assertEquals(1, installSnapshot.getChunkIndex());
748         assertEquals(3, installSnapshot.getTotalChunks());
749
750         followerActor.underlyingActor().clear();
751         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
752                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
753
754         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
755
756         assertEquals(2, installSnapshot.getChunkIndex());
757         assertEquals(3, installSnapshot.getTotalChunks());
758
759         followerActor.underlyingActor().clear();
760         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
761                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
762
763         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
764
765         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
766         followerActor.underlyingActor().clear();
767         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
768                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
769
770         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
771
772         Assert.assertNull(installSnapshot);
773     }
774
775
776     @Test
777     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
778         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
779
780         MockRaftActorContext actorContext = createActorContextWithFollower();
781
782         final int followersLastIndex = 2;
783         final int snapshotIndex = 3;
784         final int snapshotTerm = 1;
785         final int currentTerm = 2;
786
787         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
788             @Override
789             public int getSnapshotChunkSize() {
790                 return 50;
791             }
792         });
793
794         actorContext.setCommitIndex(followersLastIndex);
795
796         leader = new Leader(actorContext);
797
798         Map<String, String> leadersSnapshot = new HashMap<>();
799         leadersSnapshot.put("1", "A");
800         leadersSnapshot.put("2", "B");
801         leadersSnapshot.put("3", "C");
802
803         // set the snapshot variables in replicatedlog
804         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
807
808         ByteString bs = toByteString(leadersSnapshot);
809         leader.setSnapshot(Optional.of(bs));
810
811         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
812         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
813
814         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
815
816         assertEquals(1, installSnapshot.getChunkIndex());
817         assertEquals(3, installSnapshot.getTotalChunks());
818
819         followerActor.underlyingActor().clear();
820
821         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
822                 FOLLOWER_ID, -1, false));
823
824         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
825                 TimeUnit.MILLISECONDS);
826
827         leader.handleMessage(leaderActor, new SendHeartBeat());
828
829         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
830
831         assertEquals(1, installSnapshot.getChunkIndex());
832         assertEquals(3, installSnapshot.getTotalChunks());
833     }
834
835     @Test
836     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
837         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
838
839         MockRaftActorContext actorContext = createActorContextWithFollower();
840
841         final int followersLastIndex = 2;
842         final int snapshotIndex = 3;
843         final int snapshotTerm = 1;
844         final int currentTerm = 2;
845
846         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
847             @Override
848             public int getSnapshotChunkSize() {
849                 return 50;
850             }
851         });
852
853         actorContext.setCommitIndex(followersLastIndex);
854
855         leader = new Leader(actorContext);
856
857         Map<String, String> leadersSnapshot = new HashMap<>();
858         leadersSnapshot.put("1", "A");
859         leadersSnapshot.put("2", "B");
860         leadersSnapshot.put("3", "C");
861
862         // set the snapshot variables in replicatedlog
863         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
864         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
865         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
866
867         ByteString bs = toByteString(leadersSnapshot);
868         leader.setSnapshot(Optional.of(bs));
869
870         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
871
872         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
873
874         assertEquals(1, installSnapshot.getChunkIndex());
875         assertEquals(3, installSnapshot.getTotalChunks());
876         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
877
878         int hashCode = installSnapshot.getData().hashCode();
879
880         followerActor.underlyingActor().clear();
881
882         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
883                 FOLLOWER_ID, 1, true));
884
885         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
886
887         assertEquals(2, installSnapshot.getChunkIndex());
888         assertEquals(3, installSnapshot.getTotalChunks());
889         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
890     }
891
892     @Test
893     public void testFollowerToSnapshotLogic() {
894         logStart("testFollowerToSnapshotLogic");
895
896         MockRaftActorContext actorContext = createActorContext();
897
898         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
899             @Override
900             public int getSnapshotChunkSize() {
901                 return 50;
902             }
903         });
904
905         leader = new Leader(actorContext);
906
907         Map<String, String> leadersSnapshot = new HashMap<>();
908         leadersSnapshot.put("1", "A");
909         leadersSnapshot.put("2", "B");
910         leadersSnapshot.put("3", "C");
911
912         ByteString bs = toByteString(leadersSnapshot);
913         byte[] barray = bs.toByteArray();
914
915         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
916         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
917
918         assertEquals(bs.size(), barray.length);
919
920         int chunkIndex=0;
921         for (int i=0; i < barray.length; i = i + 50) {
922             int j = i + 50;
923             chunkIndex++;
924
925             if (i + 50 > barray.length) {
926                 j = barray.length;
927             }
928
929             ByteString chunk = fts.getNextChunk();
930             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
931             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
932
933             fts.markSendStatus(true);
934             if (!fts.isLastChunk(chunkIndex)) {
935                 fts.incrementChunkIndex();
936             }
937         }
938
939         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
940     }
941
942     @Override protected RaftActorBehavior createBehavior(
943         RaftActorContext actorContext) {
944         return new Leader(actorContext);
945     }
946
947     @Override
948     protected MockRaftActorContext createActorContext() {
949         return createActorContext(leaderActor);
950     }
951
952     @Override
953     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
954         return createActorContext(LEADER_ID, actorRef);
955     }
956
957     private MockRaftActorContext createActorContextWithFollower() {
958         MockRaftActorContext actorContext = createActorContext();
959         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
960                 followerActor.path().toString()).build());
961         return actorContext;
962     }
963
964     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
965         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
966         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
967         configParams.setElectionTimeoutFactor(100000);
968         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
969         context.setConfigParams(configParams);
970         return context;
971     }
972
973     private MockRaftActorContext createFollowerActorContextWithLeader() {
974         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
975         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
976         followerConfig.setElectionTimeoutFactor(10000);
977         followerActorContext.setConfigParams(followerConfig);
978         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
979         return followerActorContext;
980     }
981
982     @Test
983     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
984         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
985
986         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
987
988         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
989
990         Follower follower = new Follower(followerActorContext);
991         followerActor.underlyingActor().setBehavior(follower);
992
993         Map<String, String> peerAddresses = new HashMap<>();
994         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
995
996         leaderActorContext.setPeerAddresses(peerAddresses);
997
998         leaderActorContext.getReplicatedLog().removeFrom(0);
999
1000         //create 3 entries
1001         leaderActorContext.setReplicatedLog(
1002                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1003
1004         leaderActorContext.setCommitIndex(1);
1005
1006         followerActorContext.getReplicatedLog().removeFrom(0);
1007
1008         // follower too has the exact same log entries and has the same commit index
1009         followerActorContext.setReplicatedLog(
1010                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1011
1012         followerActorContext.setCommitIndex(1);
1013
1014         leader = new Leader(leaderActorContext);
1015
1016         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1017
1018         assertEquals(1, appendEntries.getLeaderCommit());
1019         assertEquals(0, appendEntries.getEntries().size());
1020         assertEquals(0, appendEntries.getPrevLogIndex());
1021
1022         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1023                 leaderActor, AppendEntriesReply.class);
1024
1025         assertEquals(2, appendEntriesReply.getLogLastIndex());
1026         assertEquals(1, appendEntriesReply.getLogLastTerm());
1027
1028         // follower returns its next index
1029         assertEquals(2, appendEntriesReply.getLogLastIndex());
1030         assertEquals(1, appendEntriesReply.getLogLastTerm());
1031
1032         follower.close();
1033     }
1034
1035     @Test
1036     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1037         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1038
1039         MockRaftActorContext leaderActorContext = createActorContext();
1040
1041         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1042         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1043
1044         Follower follower = new Follower(followerActorContext);
1045         followerActor.underlyingActor().setBehavior(follower);
1046
1047         Map<String, String> leaderPeerAddresses = new HashMap<>();
1048         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1049
1050         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1051
1052         leaderActorContext.getReplicatedLog().removeFrom(0);
1053
1054         leaderActorContext.setReplicatedLog(
1055                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1056
1057         leaderActorContext.setCommitIndex(1);
1058
1059         followerActorContext.getReplicatedLog().removeFrom(0);
1060
1061         followerActorContext.setReplicatedLog(
1062                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1063
1064         // follower has the same log entries but its commit index > leaders commit index
1065         followerActorContext.setCommitIndex(2);
1066
1067         leader = new Leader(leaderActorContext);
1068
1069         // Initial heartbeat
1070         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1071
1072         assertEquals(1, appendEntries.getLeaderCommit());
1073         assertEquals(0, appendEntries.getEntries().size());
1074         assertEquals(0, appendEntries.getPrevLogIndex());
1075
1076         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1077                 leaderActor, AppendEntriesReply.class);
1078
1079         assertEquals(2, appendEntriesReply.getLogLastIndex());
1080         assertEquals(1, appendEntriesReply.getLogLastTerm());
1081
1082         leaderActor.underlyingActor().setBehavior(follower);
1083         leader.handleMessage(followerActor, appendEntriesReply);
1084
1085         leaderActor.underlyingActor().clear();
1086         followerActor.underlyingActor().clear();
1087
1088         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1089                 TimeUnit.MILLISECONDS);
1090
1091         leader.handleMessage(leaderActor, new SendHeartBeat());
1092
1093         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1094
1095         assertEquals(2, appendEntries.getLeaderCommit());
1096         assertEquals(0, appendEntries.getEntries().size());
1097         assertEquals(2, appendEntries.getPrevLogIndex());
1098
1099         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1100
1101         assertEquals(2, appendEntriesReply.getLogLastIndex());
1102         assertEquals(1, appendEntriesReply.getLogLastTerm());
1103
1104         assertEquals(2, followerActorContext.getCommitIndex());
1105
1106         follower.close();
1107     }
1108
1109     @Test
1110     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1111         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1112
1113         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1114         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1115                 new FiniteDuration(1000, TimeUnit.SECONDS));
1116
1117         leaderActorContext.setReplicatedLog(
1118                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1119         long leaderCommitIndex = 2;
1120         leaderActorContext.setCommitIndex(leaderCommitIndex);
1121         leaderActorContext.setLastApplied(leaderCommitIndex);
1122
1123         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1124         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1125
1126         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1127
1128         followerActorContext.setReplicatedLog(
1129                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1130         followerActorContext.setCommitIndex(0);
1131         followerActorContext.setLastApplied(0);
1132
1133         Follower follower = new Follower(followerActorContext);
1134         followerActor.underlyingActor().setBehavior(follower);
1135
1136         leader = new Leader(leaderActorContext);
1137
1138         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1139         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1140
1141         MessageCollectorActor.clearMessages(followerActor);
1142         MessageCollectorActor.clearMessages(leaderActor);
1143
1144         // Verify initial AppendEntries sent with the leader's current commit index.
1145         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1146         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1147         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1148
1149         leaderActor.underlyingActor().setBehavior(leader);
1150
1151         leader.handleMessage(followerActor, appendEntriesReply);
1152
1153         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1154         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1155
1156         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1157         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1158         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1159
1160         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1161         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1162                 appendEntries.getEntries().get(0).getData());
1163         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1164         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1165                 appendEntries.getEntries().get(1).getData());
1166
1167         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1168         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1169
1170         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1171
1172         ApplyState applyState = applyStateList.get(0);
1173         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1174         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1175         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1176                 applyState.getReplicatedLogEntry().getData());
1177
1178         applyState = applyStateList.get(1);
1179         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1180         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1181         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1182                 applyState.getReplicatedLogEntry().getData());
1183
1184         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1185         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1186     }
1187
1188     @Test
1189     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1190         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1191
1192         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1193         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1194                 new FiniteDuration(1000, TimeUnit.SECONDS));
1195
1196         leaderActorContext.setReplicatedLog(
1197                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1198         long leaderCommitIndex = 1;
1199         leaderActorContext.setCommitIndex(leaderCommitIndex);
1200         leaderActorContext.setLastApplied(leaderCommitIndex);
1201
1202         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1203         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1204
1205         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1206
1207         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1208         followerActorContext.setCommitIndex(-1);
1209         followerActorContext.setLastApplied(-1);
1210
1211         Follower follower = new Follower(followerActorContext);
1212         followerActor.underlyingActor().setBehavior(follower);
1213
1214         leader = new Leader(leaderActorContext);
1215
1216         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1217         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1218
1219         MessageCollectorActor.clearMessages(followerActor);
1220         MessageCollectorActor.clearMessages(leaderActor);
1221
1222         // Verify initial AppendEntries sent with the leader's current commit index.
1223         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1224         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1225         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1226
1227         leaderActor.underlyingActor().setBehavior(leader);
1228
1229         leader.handleMessage(followerActor, appendEntriesReply);
1230
1231         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1232         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1233
1234         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1235         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1236         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1237
1238         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1239         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1240                 appendEntries.getEntries().get(0).getData());
1241         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1242         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1243                 appendEntries.getEntries().get(1).getData());
1244
1245         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1246         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1247
1248         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1249
1250         ApplyState applyState = applyStateList.get(0);
1251         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1252         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1253         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1254                 applyState.getReplicatedLogEntry().getData());
1255
1256         applyState = applyStateList.get(1);
1257         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1258         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1259         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1260                 applyState.getReplicatedLogEntry().getData());
1261
1262         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1263         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1264     }
1265
1266     @Test
1267     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1268         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1269
1270         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1271         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1272                 new FiniteDuration(1000, TimeUnit.SECONDS));
1273
1274         leaderActorContext.setReplicatedLog(
1275                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1276         long leaderCommitIndex = 1;
1277         leaderActorContext.setCommitIndex(leaderCommitIndex);
1278         leaderActorContext.setLastApplied(leaderCommitIndex);
1279
1280         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1281         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1282
1283         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1284
1285         followerActorContext.setReplicatedLog(
1286                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1287         followerActorContext.setCommitIndex(-1);
1288         followerActorContext.setLastApplied(-1);
1289
1290         Follower follower = new Follower(followerActorContext);
1291         followerActor.underlyingActor().setBehavior(follower);
1292
1293         leader = new Leader(leaderActorContext);
1294
1295         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1296         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1297
1298         MessageCollectorActor.clearMessages(followerActor);
1299         MessageCollectorActor.clearMessages(leaderActor);
1300
1301         // Verify initial AppendEntries sent with the leader's current commit index.
1302         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1303         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1304         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1305
1306         leaderActor.underlyingActor().setBehavior(leader);
1307
1308         leader.handleMessage(followerActor, appendEntriesReply);
1309
1310         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1311         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1312
1313         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1314         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1315         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1316
1317         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1318         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1319         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1320                 appendEntries.getEntries().get(0).getData());
1321         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1322         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1323         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1324                 appendEntries.getEntries().get(1).getData());
1325
1326         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1327         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1328
1329         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1330
1331         ApplyState applyState = applyStateList.get(0);
1332         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1333         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1334         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1335                 applyState.getReplicatedLogEntry().getData());
1336
1337         applyState = applyStateList.get(1);
1338         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1339         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1340         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1341                 applyState.getReplicatedLogEntry().getData());
1342
1343         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1344         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1345         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1346     }
1347
1348     @Test
1349     public void testHandleAppendEntriesReplySuccess() throws Exception {
1350         logStart("testHandleAppendEntriesReplySuccess");
1351
1352         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1353
1354         leaderActorContext.setReplicatedLog(
1355                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1356
1357         leaderActorContext.setCommitIndex(1);
1358         leaderActorContext.setLastApplied(1);
1359         leaderActorContext.getTermInformation().update(1, "leader");
1360
1361         leader = new Leader(leaderActorContext);
1362
1363         short payloadVersion = 5;
1364         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1365
1366         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1367
1368         assertEquals(RaftState.Leader, raftActorBehavior.state());
1369
1370         assertEquals(2, leaderActorContext.getCommitIndex());
1371
1372         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1373                 leaderActor, ApplyJournalEntries.class);
1374
1375         assertEquals(2, leaderActorContext.getLastApplied());
1376
1377         assertEquals(2, applyJournalEntries.getToIndex());
1378
1379         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1380                 ApplyState.class);
1381
1382         assertEquals(1,applyStateList.size());
1383
1384         ApplyState applyState = applyStateList.get(0);
1385
1386         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1387
1388         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1389         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1390     }
1391
1392     @Test
1393     public void testHandleAppendEntriesReplyUnknownFollower(){
1394         logStart("testHandleAppendEntriesReplyUnknownFollower");
1395
1396         MockRaftActorContext leaderActorContext = createActorContext();
1397
1398         leader = new Leader(leaderActorContext);
1399
1400         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1401
1402         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1403
1404         assertEquals(RaftState.Leader, raftActorBehavior.state());
1405     }
1406
1407     @Test
1408     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1409         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1410
1411         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1412         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1413                 new FiniteDuration(1000, TimeUnit.SECONDS));
1414         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1415
1416         leaderActorContext.setReplicatedLog(
1417                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1418         long leaderCommitIndex = 3;
1419         leaderActorContext.setCommitIndex(leaderCommitIndex);
1420         leaderActorContext.setLastApplied(leaderCommitIndex);
1421
1422         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1423         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1424         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1425         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1426
1427         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1428
1429         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1430         followerActorContext.setCommitIndex(-1);
1431         followerActorContext.setLastApplied(-1);
1432
1433         Follower follower = new Follower(followerActorContext);
1434         followerActor.underlyingActor().setBehavior(follower);
1435
1436         leader = new Leader(leaderActorContext);
1437
1438         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1439         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1440
1441         MessageCollectorActor.clearMessages(followerActor);
1442         MessageCollectorActor.clearMessages(leaderActor);
1443
1444         // Verify initial AppendEntries sent with the leader's current commit index.
1445         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1446         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1447         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1448
1449         leaderActor.underlyingActor().setBehavior(leader);
1450
1451         leader.handleMessage(followerActor, appendEntriesReply);
1452
1453         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1454         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1455
1456         appendEntries = appendEntriesList.get(0);
1457         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1458         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1459         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1460
1461         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1462         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1463                 appendEntries.getEntries().get(0).getData());
1464         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1465         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1466                 appendEntries.getEntries().get(1).getData());
1467
1468         appendEntries = appendEntriesList.get(1);
1469         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1470         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1471         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1472
1473         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1474         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1475                 appendEntries.getEntries().get(0).getData());
1476         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1477         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1478                 appendEntries.getEntries().get(1).getData());
1479
1480         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1481         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1482
1483         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1484
1485         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1486         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1487     }
1488
1489     @Test
1490     public void testHandleRequestVoteReply(){
1491         logStart("testHandleRequestVoteReply");
1492
1493         MockRaftActorContext leaderActorContext = createActorContext();
1494
1495         leader = new Leader(leaderActorContext);
1496
1497         // Should be a no-op.
1498         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1499                 new RequestVoteReply(1, true));
1500
1501         assertEquals(RaftState.Leader, raftActorBehavior.state());
1502
1503         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1504
1505         assertEquals(RaftState.Leader, raftActorBehavior.state());
1506     }
1507
1508     @Test
1509     public void testIsolatedLeaderCheckNoFollowers() {
1510         logStart("testIsolatedLeaderCheckNoFollowers");
1511
1512         MockRaftActorContext leaderActorContext = createActorContext();
1513
1514         leader = new Leader(leaderActorContext);
1515         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1516         Assert.assertTrue(behavior instanceof Leader);
1517     }
1518
1519     @Test
1520     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1521         logStart("testIsolatedLeaderCheckTwoFollowers");
1522
1523         new JavaTestKit(getSystem()) {{
1524
1525             ActorRef followerActor1 = getTestActor();
1526             ActorRef followerActor2 = getTestActor();
1527
1528             MockRaftActorContext leaderActorContext = createActorContext();
1529
1530             Map<String, String> peerAddresses = new HashMap<>();
1531             peerAddresses.put("follower-1", followerActor1.path().toString());
1532             peerAddresses.put("follower-2", followerActor2.path().toString());
1533
1534             leaderActorContext.setPeerAddresses(peerAddresses);
1535
1536             leader = new Leader(leaderActorContext);
1537
1538             leader.markFollowerActive("follower-1");
1539             leader.markFollowerActive("follower-2");
1540             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1541             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1542                 behavior instanceof Leader);
1543
1544             // kill 1 follower and verify if that got killed
1545             final JavaTestKit probe = new JavaTestKit(getSystem());
1546             probe.watch(followerActor1);
1547             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1548             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1549             assertEquals(termMsg1.getActor(), followerActor1);
1550
1551             leader.markFollowerInActive("follower-1");
1552             leader.markFollowerActive("follower-2");
1553             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1554             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1555                 behavior instanceof Leader);
1556
1557             // kill 2nd follower and leader should change to Isolated leader
1558             followerActor2.tell(PoisonPill.getInstance(), null);
1559             probe.watch(followerActor2);
1560             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1562             assertEquals(termMsg2.getActor(), followerActor2);
1563
1564             leader.markFollowerInActive("follower-2");
1565             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1566             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1567                 behavior instanceof IsolatedLeader);
1568         }};
1569     }
1570
1571     @Test
1572     public void testLaggingFollowerStarvation() throws Exception {
1573         logStart("testLaggingFollowerStarvation");
1574         new JavaTestKit(getSystem()) {{
1575             String leaderActorId = actorFactory.generateActorId("leader");
1576             String follower1ActorId = actorFactory.generateActorId("follower");
1577             String follower2ActorId = actorFactory.generateActorId("follower");
1578
1579             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1580                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1581             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1582             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1583
1584             MockRaftActorContext leaderActorContext =
1585                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1586
1587             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1588             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1589             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1590
1591             leaderActorContext.setConfigParams(configParams);
1592
1593             leaderActorContext.setReplicatedLog(
1594                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1595
1596             Map<String, String> peerAddresses = new HashMap<>();
1597             peerAddresses.put(follower1ActorId,
1598                     follower1Actor.path().toString());
1599             peerAddresses.put(follower2ActorId,
1600                     follower2Actor.path().toString());
1601
1602             leaderActorContext.setPeerAddresses(peerAddresses);
1603             leaderActorContext.getTermInformation().update(1, leaderActorId);
1604
1605             RaftActorBehavior leader = createBehavior(leaderActorContext);
1606
1607             leaderActor.underlyingActor().setBehavior(leader);
1608
1609             for(int i=1;i<6;i++) {
1610                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1611                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1612                 assertTrue(newBehavior == leader);
1613                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1614             }
1615
1616             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1617             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1618
1619             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1620                     heartbeats.size() > 1);
1621
1622             // Check if follower-2 got AppendEntries during this time and was not starved
1623             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1624
1625             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1626                     appendEntries.size() > 1);
1627
1628         }};
1629     }
1630
1631     @Override
1632     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1633             ActorRef actorRef, RaftRPC rpc) throws Exception {
1634         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1635         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1636     }
1637
1638     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1639
1640         private final long electionTimeOutIntervalMillis;
1641         private final int snapshotChunkSize;
1642
1643         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1644             super();
1645             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1646             this.snapshotChunkSize = snapshotChunkSize;
1647         }
1648
1649         @Override
1650         public FiniteDuration getElectionTimeOutInterval() {
1651             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1652         }
1653
1654         @Override
1655         public int getSnapshotChunkSize() {
1656             return snapshotChunkSize;
1657         }
1658     }
1659 }