Merge changes Idfb4fbea,Ief82b050
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
57
58     private Leader leader;
59
60     @Override
61     @After
62     public void tearDown() throws Exception {
63         if(leader != null) {
64             leader.close();
65         }
66
67         super.tearDown();
68     }
69
70     @Test
71     public void testHandleMessageForUnknownMessage() throws Exception {
72         logStart("testHandleMessageForUnknownMessage");
73
74         leader = new Leader(createActorContext());
75
76         // handle message should return the Leader state when it receives an
77         // unknown message
78         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79         Assert.assertTrue(behavior instanceof Leader);
80     }
81
82     @Test
83     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
85
86         MockRaftActorContext actorContext = createActorContextWithFollower();
87
88         long term = 1;
89         actorContext.getTermInformation().update(term, "");
90
91         leader = new Leader(actorContext);
92
93         // Leader should send an immediate heartbeat with no entries as follower is inactive.
94         long lastIndex = actorContext.getReplicatedLog().lastIndex();
95         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96         assertEquals("getTerm", term, appendEntries.getTerm());
97         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99         assertEquals("Entries size", 0, appendEntries.getEntries().size());
100
101         // The follower would normally reply - simulate that explicitly here.
102         leader.handleMessage(followerActor, new AppendEntriesReply(
103                 FOLLOWER_ID, term, true, lastIndex - 1, term));
104         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
105
106         followerActor.underlyingActor().clear();
107
108         // Sleep for the heartbeat interval so AppendEntries is sent.
109         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
111
112         leader.handleMessage(leaderActor, new SendHeartBeat());
113
114         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117         assertEquals("Entries size", 1, appendEntries.getEntries().size());
118         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
120     }
121
122     @Test
123     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
124         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
125
126         MockRaftActorContext actorContext = createActorContextWithFollower();
127
128         long term = 1;
129         actorContext.getTermInformation().update(term, "");
130
131         leader = new Leader(actorContext);
132
133         // Leader will send an immediate heartbeat - ignore it.
134         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
135
136         // The follower would normally reply - simulate that explicitly here.
137         long lastIndex = actorContext.getReplicatedLog().lastIndex();
138         leader.handleMessage(followerActor, new AppendEntriesReply(
139                 FOLLOWER_ID, term, true, lastIndex, term));
140         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
141
142         followerActor.underlyingActor().clear();
143
144         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
145         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
146                 1, lastIndex + 1, payload);
147         actorContext.getReplicatedLog().append(newEntry);
148         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
149                 new Replicate(null, null, newEntry));
150
151         // State should not change
152         assertTrue(raftBehavior instanceof Leader);
153
154         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
155         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
156         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
157         assertEquals("Entries size", 1, appendEntries.getEntries().size());
158         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
159         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
160         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
161     }
162
163     @Test
164     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
165         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
166
167         MockRaftActorContext actorContext = createActorContext();
168
169         leader = new Leader(actorContext);
170
171         actorContext.setLastApplied(0);
172
173         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
174         long term = actorContext.getTermInformation().getCurrentTerm();
175         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
176                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
177
178         actorContext.getReplicatedLog().append(newEntry);
179
180         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
181                 new Replicate(leaderActor, "state-id", newEntry));
182
183         // State should not change
184         assertTrue(raftBehavior instanceof Leader);
185
186         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
187
188         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
189         // one since lastApplied state is 0.
190         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
191                 leaderActor, ApplyState.class);
192         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
193
194         for(int i = 0; i <= newLogIndex - 1; i++ ) {
195             ApplyState applyState = applyStateList.get(i);
196             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
197             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
198         }
199
200         ApplyState last = applyStateList.get((int) newLogIndex - 1);
201         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
202         assertEquals("getIdentifier", "state-id", last.getIdentifier());
203     }
204
205     @Test
206     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
207         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
208
209         MockRaftActorContext actorContext = createActorContextWithFollower();
210
211         Map<String, String> leadersSnapshot = new HashMap<>();
212         leadersSnapshot.put("1", "A");
213         leadersSnapshot.put("2", "B");
214         leadersSnapshot.put("3", "C");
215
216         //clears leaders log
217         actorContext.getReplicatedLog().removeFrom(0);
218
219         final int followersLastIndex = 2;
220         final int snapshotIndex = 3;
221         final int newEntryIndex = 4;
222         final int snapshotTerm = 1;
223         final int currentTerm = 2;
224
225         // set the snapshot variables in replicatedlog
226         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
227         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
228         actorContext.setCommitIndex(followersLastIndex);
229         //set follower timeout to 2 mins, helps during debugging
230         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
231
232         leader = new Leader(actorContext);
233
234         // new entry
235         ReplicatedLogImplEntry entry =
236                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
237                         new MockRaftActorContext.MockPayload("D"));
238
239         //update follower timestamp
240         leader.markFollowerActive(FOLLOWER_ID);
241
242         ByteString bs = toByteString(leadersSnapshot);
243         leader.setSnapshot(Optional.of(bs));
244         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
245         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
246
247         //send first chunk and no InstallSnapshotReply received yet
248         fts.getNextChunk();
249         fts.incrementChunkIndex();
250
251         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
252                 TimeUnit.MILLISECONDS);
253
254         leader.handleMessage(leaderActor, new SendHeartBeat());
255
256         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
257
258         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
259
260         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
261
262         //InstallSnapshotReply received
263         fts.markSendStatus(true);
264
265         leader.handleMessage(leaderActor, new SendHeartBeat());
266
267         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
268
269         assertEquals(snapshotIndex, is.getLastIncludedIndex());
270     }
271
272     @Test
273     public void testSendAppendEntriesSnapshotScenario() throws Exception {
274         logStart("testSendAppendEntriesSnapshotScenario");
275
276         MockRaftActorContext actorContext = createActorContextWithFollower();
277
278         Map<String, String> leadersSnapshot = new HashMap<>();
279         leadersSnapshot.put("1", "A");
280         leadersSnapshot.put("2", "B");
281         leadersSnapshot.put("3", "C");
282
283         //clears leaders log
284         actorContext.getReplicatedLog().removeFrom(0);
285
286         final int followersLastIndex = 2;
287         final int snapshotIndex = 3;
288         final int newEntryIndex = 4;
289         final int snapshotTerm = 1;
290         final int currentTerm = 2;
291
292         // set the snapshot variables in replicatedlog
293         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
294         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
295         actorContext.setCommitIndex(followersLastIndex);
296
297         leader = new Leader(actorContext);
298
299         // Leader will send an immediate heartbeat - ignore it.
300         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
301
302         // new entry
303         ReplicatedLogImplEntry entry =
304                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
305                         new MockRaftActorContext.MockPayload("D"));
306
307         //update follower timestamp
308         leader.markFollowerActive(FOLLOWER_ID);
309
310         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
311         RaftActorBehavior raftBehavior = leader.handleMessage(
312                 leaderActor, new Replicate(null, "state-id", entry));
313
314         assertTrue(raftBehavior instanceof Leader);
315
316         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
317     }
318
319     @Test
320     public void testInitiateInstallSnapshot() throws Exception {
321         logStart("testInitiateInstallSnapshot");
322
323         MockRaftActorContext actorContext = createActorContextWithFollower();
324
325         Map<String, String> leadersSnapshot = new HashMap<>();
326         leadersSnapshot.put("1", "A");
327         leadersSnapshot.put("2", "B");
328         leadersSnapshot.put("3", "C");
329
330         //clears leaders log
331         actorContext.getReplicatedLog().removeFrom(0);
332
333         final int followersLastIndex = 2;
334         final int snapshotIndex = 3;
335         final int newEntryIndex = 4;
336         final int snapshotTerm = 1;
337         final int currentTerm = 2;
338
339         // set the snapshot variables in replicatedlog
340         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
341         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
342         actorContext.setLastApplied(3);
343         actorContext.setCommitIndex(followersLastIndex);
344
345         leader = new Leader(actorContext);
346
347         // Leader will send an immediate heartbeat - ignore it.
348         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
349
350         // set the snapshot as absent and check if capture-snapshot is invoked.
351         leader.setSnapshot(Optional.<ByteString>absent());
352
353         // new entry
354         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
355                 new MockRaftActorContext.MockPayload("D"));
356
357         actorContext.getReplicatedLog().append(entry);
358
359         //update follower timestamp
360         leader.markFollowerActive(FOLLOWER_ID);
361
362         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
363
364         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
365
366         assertTrue(cs.isInstallSnapshotInitiated());
367         assertEquals(3, cs.getLastAppliedIndex());
368         assertEquals(1, cs.getLastAppliedTerm());
369         assertEquals(4, cs.getLastIndex());
370         assertEquals(2, cs.getLastTerm());
371
372         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
373         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
374
375         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
376         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
377     }
378
379     @Test
380     public void testInstallSnapshot() throws Exception {
381         logStart("testInstallSnapshot");
382
383         MockRaftActorContext actorContext = createActorContextWithFollower();
384
385         Map<String, String> leadersSnapshot = new HashMap<>();
386         leadersSnapshot.put("1", "A");
387         leadersSnapshot.put("2", "B");
388         leadersSnapshot.put("3", "C");
389
390         //clears leaders log
391         actorContext.getReplicatedLog().removeFrom(0);
392
393         final int followersLastIndex = 2;
394         final int snapshotIndex = 3;
395         final int snapshotTerm = 1;
396         final int currentTerm = 2;
397
398         // set the snapshot variables in replicatedlog
399         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
400         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
401         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
402         actorContext.setCommitIndex(followersLastIndex);
403
404         leader = new Leader(actorContext);
405
406         // Ignore initial heartbeat.
407         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
408
409         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
410                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
411
412         assertTrue(raftBehavior instanceof Leader);
413
414         // check if installsnapshot gets called with the correct values.
415
416         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
417
418         assertNotNull(installSnapshot.getData());
419         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
420         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
421
422         assertEquals(currentTerm, installSnapshot.getTerm());
423     }
424
425     @Test
426     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
427         logStart("testHandleInstallSnapshotReplyLastChunk");
428
429         MockRaftActorContext actorContext = createActorContextWithFollower();
430
431         final int followersLastIndex = 2;
432         final int snapshotIndex = 3;
433         final int snapshotTerm = 1;
434         final int currentTerm = 2;
435
436         actorContext.setCommitIndex(followersLastIndex);
437
438         leader = new Leader(actorContext);
439
440         // Ignore initial heartbeat.
441         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
442
443         Map<String, String> leadersSnapshot = new HashMap<>();
444         leadersSnapshot.put("1", "A");
445         leadersSnapshot.put("2", "B");
446         leadersSnapshot.put("3", "C");
447
448         // set the snapshot variables in replicatedlog
449
450         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
451         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
452         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
453
454         ByteString bs = toByteString(leadersSnapshot);
455         leader.setSnapshot(Optional.of(bs));
456         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
457         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
458         while(!fts.isLastChunk(fts.getChunkIndex())) {
459             fts.getNextChunk();
460             fts.incrementChunkIndex();
461         }
462
463         //clears leaders log
464         actorContext.getReplicatedLog().removeFrom(0);
465
466         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
467                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
468
469         assertTrue(raftBehavior instanceof Leader);
470
471         assertEquals(0, leader.followerSnapshotSize());
472         assertEquals(1, leader.followerLogSize());
473         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
474         assertNotNull(fli);
475         assertEquals(snapshotIndex, fli.getMatchIndex());
476         assertEquals(snapshotIndex, fli.getMatchIndex());
477         assertEquals(snapshotIndex + 1, fli.getNextIndex());
478     }
479
480     @Test
481     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
482         logStart("testSendSnapshotfromInstallSnapshotReply");
483
484         MockRaftActorContext actorContext = createActorContextWithFollower();
485
486         final int followersLastIndex = 2;
487         final int snapshotIndex = 3;
488         final int snapshotTerm = 1;
489         final int currentTerm = 2;
490
491         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
492             @Override
493             public int getSnapshotChunkSize() {
494                 return 50;
495             }
496         };
497         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
498         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
499
500         actorContext.setConfigParams(configParams);
501         actorContext.setCommitIndex(followersLastIndex);
502
503         leader = new Leader(actorContext);
504
505         Map<String, String> leadersSnapshot = new HashMap<>();
506         leadersSnapshot.put("1", "A");
507         leadersSnapshot.put("2", "B");
508         leadersSnapshot.put("3", "C");
509
510         // set the snapshot variables in replicatedlog
511         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
512         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
513         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
514
515         ByteString bs = toByteString(leadersSnapshot);
516         leader.setSnapshot(Optional.of(bs));
517
518         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
519
520         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
521
522         assertEquals(1, installSnapshot.getChunkIndex());
523         assertEquals(3, installSnapshot.getTotalChunks());
524
525         followerActor.underlyingActor().clear();
526         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
527                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
528
529         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
530
531         assertEquals(2, installSnapshot.getChunkIndex());
532         assertEquals(3, installSnapshot.getTotalChunks());
533
534         followerActor.underlyingActor().clear();
535         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
536                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
537
538         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
539
540         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
541         followerActor.underlyingActor().clear();
542         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
543                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
544
545         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
546
547         Assert.assertNull(installSnapshot);
548     }
549
550
551     @Test
552     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
553         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
554
555         MockRaftActorContext actorContext = createActorContextWithFollower();
556
557         final int followersLastIndex = 2;
558         final int snapshotIndex = 3;
559         final int snapshotTerm = 1;
560         final int currentTerm = 2;
561
562         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
563             @Override
564             public int getSnapshotChunkSize() {
565                 return 50;
566             }
567         });
568
569         actorContext.setCommitIndex(followersLastIndex);
570
571         leader = new Leader(actorContext);
572
573         Map<String, String> leadersSnapshot = new HashMap<>();
574         leadersSnapshot.put("1", "A");
575         leadersSnapshot.put("2", "B");
576         leadersSnapshot.put("3", "C");
577
578         // set the snapshot variables in replicatedlog
579         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
580         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
581         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
582
583         ByteString bs = toByteString(leadersSnapshot);
584         leader.setSnapshot(Optional.of(bs));
585
586         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
587         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
588
589         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
590
591         assertEquals(1, installSnapshot.getChunkIndex());
592         assertEquals(3, installSnapshot.getTotalChunks());
593
594         followerActor.underlyingActor().clear();
595
596         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
597                 FOLLOWER_ID, -1, false));
598
599         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
600                 TimeUnit.MILLISECONDS);
601
602         leader.handleMessage(leaderActor, new SendHeartBeat());
603
604         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
605
606         assertEquals(1, installSnapshot.getChunkIndex());
607         assertEquals(3, installSnapshot.getTotalChunks());
608     }
609
610     @Test
611     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
612         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
613
614         MockRaftActorContext actorContext = createActorContextWithFollower();
615
616         final int followersLastIndex = 2;
617         final int snapshotIndex = 3;
618         final int snapshotTerm = 1;
619         final int currentTerm = 2;
620
621         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
622             @Override
623             public int getSnapshotChunkSize() {
624                 return 50;
625             }
626         });
627
628         actorContext.setCommitIndex(followersLastIndex);
629
630         leader = new Leader(actorContext);
631
632         Map<String, String> leadersSnapshot = new HashMap<>();
633         leadersSnapshot.put("1", "A");
634         leadersSnapshot.put("2", "B");
635         leadersSnapshot.put("3", "C");
636
637         // set the snapshot variables in replicatedlog
638         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
641
642         ByteString bs = toByteString(leadersSnapshot);
643         leader.setSnapshot(Optional.of(bs));
644
645         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
646
647         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
648
649         assertEquals(1, installSnapshot.getChunkIndex());
650         assertEquals(3, installSnapshot.getTotalChunks());
651         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
652
653         int hashCode = installSnapshot.getData().hashCode();
654
655         followerActor.underlyingActor().clear();
656
657         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
658                 FOLLOWER_ID, 1, true));
659
660         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
661
662         assertEquals(2, installSnapshot.getChunkIndex());
663         assertEquals(3, installSnapshot.getTotalChunks());
664         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
665     }
666
667     @Test
668     public void testFollowerToSnapshotLogic() {
669         logStart("testFollowerToSnapshotLogic");
670
671         MockRaftActorContext actorContext = createActorContext();
672
673         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
674             @Override
675             public int getSnapshotChunkSize() {
676                 return 50;
677             }
678         });
679
680         leader = new Leader(actorContext);
681
682         Map<String, String> leadersSnapshot = new HashMap<>();
683         leadersSnapshot.put("1", "A");
684         leadersSnapshot.put("2", "B");
685         leadersSnapshot.put("3", "C");
686
687         ByteString bs = toByteString(leadersSnapshot);
688         byte[] barray = bs.toByteArray();
689
690         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
691         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
692
693         assertEquals(bs.size(), barray.length);
694
695         int chunkIndex=0;
696         for (int i=0; i < barray.length; i = i + 50) {
697             int j = i + 50;
698             chunkIndex++;
699
700             if (i + 50 > barray.length) {
701                 j = barray.length;
702             }
703
704             ByteString chunk = fts.getNextChunk();
705             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
706             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
707
708             fts.markSendStatus(true);
709             if (!fts.isLastChunk(chunkIndex)) {
710                 fts.incrementChunkIndex();
711             }
712         }
713
714         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
715     }
716
717     @Override protected RaftActorBehavior createBehavior(
718         RaftActorContext actorContext) {
719         return new Leader(actorContext);
720     }
721
722     @Override
723     protected MockRaftActorContext createActorContext() {
724         return createActorContext(leaderActor);
725     }
726
727     @Override
728     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
729         return createActorContext("leader", actorRef);
730     }
731
732     private MockRaftActorContext createActorContextWithFollower() {
733         MockRaftActorContext actorContext = createActorContext();
734         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
735                 followerActor.path().toString()).build());
736         return actorContext;
737     }
738
739     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
740         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
741         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
742         configParams.setElectionTimeoutFactor(100000);
743         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
744         context.setConfigParams(configParams);
745         return context;
746     }
747
748     @Test
749     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
750         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
751
752         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
753
754         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
755
756         Follower follower = new Follower(followerActorContext);
757         followerActor.underlyingActor().setBehavior(follower);
758
759         Map<String, String> peerAddresses = new HashMap<>();
760         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
761
762         leaderActorContext.setPeerAddresses(peerAddresses);
763
764         leaderActorContext.getReplicatedLog().removeFrom(0);
765
766         //create 3 entries
767         leaderActorContext.setReplicatedLog(
768                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
769
770         leaderActorContext.setCommitIndex(1);
771
772         followerActorContext.getReplicatedLog().removeFrom(0);
773
774         // follower too has the exact same log entries and has the same commit index
775         followerActorContext.setReplicatedLog(
776                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
777
778         followerActorContext.setCommitIndex(1);
779
780         leader = new Leader(leaderActorContext);
781
782         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
783
784         assertEquals(1, appendEntries.getLeaderCommit());
785         assertEquals(0, appendEntries.getEntries().size());
786         assertEquals(0, appendEntries.getPrevLogIndex());
787
788         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
789                 leaderActor, AppendEntriesReply.class);
790
791         assertEquals(2, appendEntriesReply.getLogLastIndex());
792         assertEquals(1, appendEntriesReply.getLogLastTerm());
793
794         // follower returns its next index
795         assertEquals(2, appendEntriesReply.getLogLastIndex());
796         assertEquals(1, appendEntriesReply.getLogLastTerm());
797
798         follower.close();
799     }
800
801     @Test
802     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
803         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
804
805         MockRaftActorContext leaderActorContext = createActorContext();
806
807         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
808
809         Follower follower = new Follower(followerActorContext);
810         followerActor.underlyingActor().setBehavior(follower);
811
812         Map<String, String> peerAddresses = new HashMap<>();
813         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
814
815         leaderActorContext.setPeerAddresses(peerAddresses);
816
817         leaderActorContext.getReplicatedLog().removeFrom(0);
818
819         leaderActorContext.setReplicatedLog(
820                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
821
822         leaderActorContext.setCommitIndex(1);
823
824         followerActorContext.getReplicatedLog().removeFrom(0);
825
826         followerActorContext.setReplicatedLog(
827                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
828
829         // follower has the same log entries but its commit index > leaders commit index
830         followerActorContext.setCommitIndex(2);
831
832         leader = new Leader(leaderActorContext);
833
834         // Initial heartbeat
835         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
836
837         assertEquals(1, appendEntries.getLeaderCommit());
838         assertEquals(0, appendEntries.getEntries().size());
839         assertEquals(0, appendEntries.getPrevLogIndex());
840
841         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
842                 leaderActor, AppendEntriesReply.class);
843
844         assertEquals(2, appendEntriesReply.getLogLastIndex());
845         assertEquals(1, appendEntriesReply.getLogLastTerm());
846
847         leaderActor.underlyingActor().setBehavior(follower);
848         leader.handleMessage(followerActor, appendEntriesReply);
849
850         leaderActor.underlyingActor().clear();
851         followerActor.underlyingActor().clear();
852
853         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
854                 TimeUnit.MILLISECONDS);
855
856         leader.handleMessage(leaderActor, new SendHeartBeat());
857
858         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
859
860         assertEquals(2, appendEntries.getLeaderCommit());
861         assertEquals(0, appendEntries.getEntries().size());
862         assertEquals(2, appendEntries.getPrevLogIndex());
863
864         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
865
866         assertEquals(2, appendEntriesReply.getLogLastIndex());
867         assertEquals(1, appendEntriesReply.getLogLastTerm());
868
869         assertEquals(2, followerActorContext.getCommitIndex());
870
871         follower.close();
872     }
873
874     @Test
875     public void testHandleAppendEntriesReplyFailure(){
876         logStart("testHandleAppendEntriesReplyFailure");
877
878         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
879
880         leader = new Leader(leaderActorContext);
881
882         // Send initial heartbeat reply with last index.
883         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
884
885         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
886         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
887
888         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
889
890         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
891
892         assertEquals(RaftState.Leader, raftActorBehavior.state());
893
894         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
895     }
896
897     @Test
898     public void testHandleAppendEntriesReplySuccess() throws Exception {
899         logStart("testHandleAppendEntriesReplySuccess");
900
901         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
902
903         leaderActorContext.setReplicatedLog(
904                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
905
906         leaderActorContext.setCommitIndex(1);
907         leaderActorContext.setLastApplied(1);
908         leaderActorContext.getTermInformation().update(1, "leader");
909
910         leader = new Leader(leaderActorContext);
911
912         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
913
914         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
915
916         assertEquals(RaftState.Leader, raftActorBehavior.state());
917
918         assertEquals(2, leaderActorContext.getCommitIndex());
919
920         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
921                 leaderActor, ApplyJournalEntries.class);
922
923         assertEquals(2, leaderActorContext.getLastApplied());
924
925         assertEquals(2, applyJournalEntries.getToIndex());
926
927         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
928                 ApplyState.class);
929
930         assertEquals(1,applyStateList.size());
931
932         ApplyState applyState = applyStateList.get(0);
933
934         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
935     }
936
937     @Test
938     public void testHandleAppendEntriesReplyUnknownFollower(){
939         logStart("testHandleAppendEntriesReplyUnknownFollower");
940
941         MockRaftActorContext leaderActorContext = createActorContext();
942
943         leader = new Leader(leaderActorContext);
944
945         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
946
947         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
948
949         assertEquals(RaftState.Leader, raftActorBehavior.state());
950     }
951
952     @Test
953     public void testHandleRequestVoteReply(){
954         logStart("testHandleRequestVoteReply");
955
956         MockRaftActorContext leaderActorContext = createActorContext();
957
958         leader = new Leader(leaderActorContext);
959
960         // Should be a no-op.
961         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
962                 new RequestVoteReply(1, true));
963
964         assertEquals(RaftState.Leader, raftActorBehavior.state());
965
966         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
967
968         assertEquals(RaftState.Leader, raftActorBehavior.state());
969     }
970
971     @Test
972     public void testIsolatedLeaderCheckNoFollowers() {
973         logStart("testIsolatedLeaderCheckNoFollowers");
974
975         MockRaftActorContext leaderActorContext = createActorContext();
976
977         leader = new Leader(leaderActorContext);
978         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
979         Assert.assertTrue(behavior instanceof Leader);
980     }
981
982     @Test
983     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
984         logStart("testIsolatedLeaderCheckTwoFollowers");
985
986         new JavaTestKit(getSystem()) {{
987
988             ActorRef followerActor1 = getTestActor();
989             ActorRef followerActor2 = getTestActor();
990
991             MockRaftActorContext leaderActorContext = createActorContext();
992
993             Map<String, String> peerAddresses = new HashMap<>();
994             peerAddresses.put("follower-1", followerActor1.path().toString());
995             peerAddresses.put("follower-2", followerActor2.path().toString());
996
997             leaderActorContext.setPeerAddresses(peerAddresses);
998
999             leader = new Leader(leaderActorContext);
1000
1001             leader.markFollowerActive("follower-1");
1002             leader.markFollowerActive("follower-2");
1003             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1004             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1005                 behavior instanceof Leader);
1006
1007             // kill 1 follower and verify if that got killed
1008             final JavaTestKit probe = new JavaTestKit(getSystem());
1009             probe.watch(followerActor1);
1010             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1011             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1012             assertEquals(termMsg1.getActor(), followerActor1);
1013
1014             leader.markFollowerInActive("follower-1");
1015             leader.markFollowerActive("follower-2");
1016             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1017             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1018                 behavior instanceof Leader);
1019
1020             // kill 2nd follower and leader should change to Isolated leader
1021             followerActor2.tell(PoisonPill.getInstance(), null);
1022             probe.watch(followerActor2);
1023             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1024             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1025             assertEquals(termMsg2.getActor(), followerActor2);
1026
1027             leader.markFollowerInActive("follower-2");
1028             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1029             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1030                 behavior instanceof IsolatedLeader);
1031         }};
1032     }
1033
1034
1035     @Test
1036     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1037         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1038
1039         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1040
1041         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1042         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1043         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1044
1045         leaderActorContext.setConfigParams(configParams);
1046
1047         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1048
1049         followerActorContext.setConfigParams(configParams);
1050
1051         Follower follower = new Follower(followerActorContext);
1052         followerActor.underlyingActor().setBehavior(follower);
1053
1054         leaderActorContext.getReplicatedLog().removeFrom(0);
1055         leaderActorContext.setCommitIndex(-1);
1056         leaderActorContext.setLastApplied(-1);
1057
1058         followerActorContext.getReplicatedLog().removeFrom(0);
1059         followerActorContext.setCommitIndex(-1);
1060         followerActorContext.setLastApplied(-1);
1061
1062         leader = new Leader(leaderActorContext);
1063
1064         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1065                 leaderActor, AppendEntriesReply.class);
1066
1067         leader.handleMessage(followerActor, appendEntriesReply);
1068
1069         // Clear initial heartbeat messages
1070
1071         leaderActor.underlyingActor().clear();
1072         followerActor.underlyingActor().clear();
1073
1074         // create 3 entries
1075         leaderActorContext.setReplicatedLog(
1076                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1077         leaderActorContext.setCommitIndex(1);
1078         leaderActorContext.setLastApplied(1);
1079
1080         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1081                 TimeUnit.MILLISECONDS);
1082
1083         leader.handleMessage(leaderActor, new SendHeartBeat());
1084
1085         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1086
1087         // Should send first log entry
1088         assertEquals(1, appendEntries.getLeaderCommit());
1089         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1090         assertEquals(-1, appendEntries.getPrevLogIndex());
1091
1092         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1093
1094         assertEquals(1, appendEntriesReply.getLogLastTerm());
1095         assertEquals(0, appendEntriesReply.getLogLastIndex());
1096
1097         followerActor.underlyingActor().clear();
1098
1099         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1100
1101         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1102
1103         // Should send second log entry
1104         assertEquals(1, appendEntries.getLeaderCommit());
1105         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1106
1107         follower.close();
1108     }
1109
1110     @Test
1111     public void testLaggingFollowerStarvation() throws Exception {
1112         logStart("testLaggingFollowerStarvation");
1113         new JavaTestKit(getSystem()) {{
1114             String leaderActorId = actorFactory.generateActorId("leader");
1115             String follower1ActorId = actorFactory.generateActorId("follower");
1116             String follower2ActorId = actorFactory.generateActorId("follower");
1117
1118             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1119                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1120             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1121             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1122
1123             MockRaftActorContext leaderActorContext =
1124                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1125
1126             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1127             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1128             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1129
1130             leaderActorContext.setConfigParams(configParams);
1131
1132             leaderActorContext.setReplicatedLog(
1133                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1134
1135             Map<String, String> peerAddresses = new HashMap<>();
1136             peerAddresses.put(follower1ActorId,
1137                     follower1Actor.path().toString());
1138             peerAddresses.put(follower2ActorId,
1139                     follower2Actor.path().toString());
1140
1141             leaderActorContext.setPeerAddresses(peerAddresses);
1142             leaderActorContext.getTermInformation().update(1, leaderActorId);
1143
1144             RaftActorBehavior leader = createBehavior(leaderActorContext);
1145
1146             leaderActor.underlyingActor().setBehavior(leader);
1147
1148             for(int i=1;i<6;i++) {
1149                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1150                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1151                 assertTrue(newBehavior == leader);
1152                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1153             }
1154
1155             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1156             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1157
1158             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1159                     heartbeats.size() > 1);
1160
1161             // Check if follower-2 got AppendEntries during this time and was not starved
1162             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1163
1164             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1165                     appendEntries.size() > 1);
1166
1167         }};
1168     }
1169
1170     @Override
1171     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1172             ActorRef actorRef, RaftRPC rpc) throws Exception {
1173         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1174         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1175     }
1176
1177     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1178
1179         private final long electionTimeOutIntervalMillis;
1180         private final int snapshotChunkSize;
1181
1182         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1183             super();
1184             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1185             this.snapshotChunkSize = snapshotChunkSize;
1186         }
1187
1188         @Override
1189         public FiniteDuration getElectionTimeOutInterval() {
1190             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1191         }
1192
1193         @Override
1194         public int getSnapshotChunkSize() {
1195             return snapshotChunkSize;
1196         }
1197     }
1198 }