27854f40e957a7a395fcd3e8c0cd5df871b3e00e
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractLeaderTest {
50
51     static final String FOLLOWER_ID = "follower";
52     public static final String LEADER_ID = "leader";
53
54     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56
57     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59
60     private Leader leader;
61
62     @Override
63     @After
64     public void tearDown() throws Exception {
65         if(leader != null) {
66             leader.close();
67         }
68
69         super.tearDown();
70     }
71
72     @Test
73     public void testHandleMessageForUnknownMessage() throws Exception {
74         logStart("testHandleMessageForUnknownMessage");
75
76         leader = new Leader(createActorContext());
77
78         // handle message should return the Leader state when it receives an
79         // unknown message
80         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
81         Assert.assertTrue(behavior instanceof Leader);
82     }
83
84     @Test
85     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
86         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
87
88         MockRaftActorContext actorContext = createActorContextWithFollower();
89         short payloadVersion = (short)5;
90         actorContext.setPayloadVersion(payloadVersion);
91
92         long term = 1;
93         actorContext.getTermInformation().update(term, "");
94
95         leader = new Leader(actorContext);
96
97         // Leader should send an immediate heartbeat with no entries as follower is inactive.
98         long lastIndex = actorContext.getReplicatedLog().lastIndex();
99         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
100         assertEquals("getTerm", term, appendEntries.getTerm());
101         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103         assertEquals("Entries size", 0, appendEntries.getEntries().size());
104         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
105
106         // The follower would normally reply - simulate that explicitly here.
107         leader.handleMessage(followerActor, new AppendEntriesReply(
108                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
109         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
110
111         followerActor.underlyingActor().clear();
112
113         // Sleep for the heartbeat interval so AppendEntries is sent.
114         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
115                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
116
117         leader.handleMessage(leaderActor, new SendHeartBeat());
118
119         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
121         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
122         assertEquals("Entries size", 1, appendEntries.getEntries().size());
123         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
124         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126     }
127
128
129     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
130         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
131         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
132                 1, index, payload);
133         actorContext.getReplicatedLog().append(newEntry);
134         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
135     }
136
137     @Test
138     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
139         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
140
141         MockRaftActorContext actorContext = createActorContextWithFollower();
142
143         long term = 1;
144         actorContext.getTermInformation().update(term, "");
145
146         leader = new Leader(actorContext);
147
148         // Leader will send an immediate heartbeat - ignore it.
149         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
150
151         // The follower would normally reply - simulate that explicitly here.
152         long lastIndex = actorContext.getReplicatedLog().lastIndex();
153         leader.handleMessage(followerActor, new AppendEntriesReply(
154                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
155         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
156
157         followerActor.underlyingActor().clear();
158
159         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
160         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
161                 1, lastIndex + 1, payload);
162         actorContext.getReplicatedLog().append(newEntry);
163         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
164
165         // State should not change
166         assertTrue(raftBehavior instanceof Leader);
167
168         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
169         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
170         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
171         assertEquals("Entries size", 1, appendEntries.getEntries().size());
172         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
173         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
174         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
175     }
176
177     @Test
178     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
179         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
180
181         MockRaftActorContext actorContext = createActorContextWithFollower();
182         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
183             @Override
184             public FiniteDuration getHeartBeatInterval() {
185                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
186             }
187         });
188
189         long term = 1;
190         actorContext.getTermInformation().update(term, "");
191
192         leader = new Leader(actorContext);
193
194         // Leader will send an immediate heartbeat - ignore it.
195         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
196
197         // The follower would normally reply - simulate that explicitly here.
198         long lastIndex = actorContext.getReplicatedLog().lastIndex();
199         leader.handleMessage(followerActor, new AppendEntriesReply(
200                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
201         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
202
203         followerActor.underlyingActor().clear();
204
205         for(int i=0;i<5;i++) {
206             sendReplicate(actorContext, lastIndex+i+1);
207         }
208
209         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
210         // We expect only 1 message to be sent because of two reasons,
211         // - an append entries reply was not received
212         // - the heartbeat interval has not expired
213         // In this scenario if multiple messages are sent they would likely be duplicates
214         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
215     }
216
217     @Test
218     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
219         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
220
221         MockRaftActorContext actorContext = createActorContextWithFollower();
222         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
223             @Override
224             public FiniteDuration getHeartBeatInterval() {
225                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
226             }
227         });
228
229         long term = 1;
230         actorContext.getTermInformation().update(term, "");
231
232         leader = new Leader(actorContext);
233
234         // Leader will send an immediate heartbeat - ignore it.
235         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236
237         // The follower would normally reply - simulate that explicitly here.
238         long lastIndex = actorContext.getReplicatedLog().lastIndex();
239         leader.handleMessage(followerActor, new AppendEntriesReply(
240                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
241         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
242
243         followerActor.underlyingActor().clear();
244
245         for(int i=0;i<3;i++) {
246             sendReplicate(actorContext, lastIndex+i+1);
247             leader.handleMessage(followerActor, new AppendEntriesReply(
248                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
249
250         }
251
252         for(int i=3;i<5;i++) {
253             sendReplicate(actorContext, lastIndex + i + 1);
254         }
255
256         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
258         // get sent to the follower - but not the 5th
259         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
260
261         for(int i=0;i<4;i++) {
262             long expected = allMessages.get(i).getEntries().get(0).getIndex();
263             assertEquals(expected, i+2);
264         }
265     }
266
267     @Test
268     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
269         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
270
271         MockRaftActorContext actorContext = createActorContextWithFollower();
272         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
273             @Override
274             public FiniteDuration getHeartBeatInterval() {
275                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
276             }
277         });
278
279         long term = 1;
280         actorContext.getTermInformation().update(term, "");
281
282         leader = new Leader(actorContext);
283
284         // Leader will send an immediate heartbeat - ignore it.
285         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
286
287         // The follower would normally reply - simulate that explicitly here.
288         long lastIndex = actorContext.getReplicatedLog().lastIndex();
289         leader.handleMessage(followerActor, new AppendEntriesReply(
290                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
291         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
292
293         followerActor.underlyingActor().clear();
294
295         sendReplicate(actorContext, lastIndex+1);
296
297         // Wait slightly longer than heartbeat duration
298         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
299
300         leader.handleMessage(leaderActor, new SendHeartBeat());
301
302         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
303         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
304
305         assertEquals(1, allMessages.get(0).getEntries().size());
306         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
307         assertEquals(1, allMessages.get(1).getEntries().size());
308         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
309
310     }
311
312     @Test
313     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
314         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
315
316         MockRaftActorContext actorContext = createActorContextWithFollower();
317         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
318             @Override
319             public FiniteDuration getHeartBeatInterval() {
320                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
321             }
322         });
323
324         long term = 1;
325         actorContext.getTermInformation().update(term, "");
326
327         leader = new Leader(actorContext);
328
329         // Leader will send an immediate heartbeat - ignore it.
330         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
331
332         // The follower would normally reply - simulate that explicitly here.
333         long lastIndex = actorContext.getReplicatedLog().lastIndex();
334         leader.handleMessage(followerActor, new AppendEntriesReply(
335                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
336         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
337
338         followerActor.underlyingActor().clear();
339
340         for(int i=0;i<3;i++) {
341             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
342             leader.handleMessage(leaderActor, new SendHeartBeat());
343         }
344
345         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
346         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
347     }
348
349     @Test
350     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
351         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
352
353         MockRaftActorContext actorContext = createActorContextWithFollower();
354         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355             @Override
356             public FiniteDuration getHeartBeatInterval() {
357                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
358             }
359         });
360
361         long term = 1;
362         actorContext.getTermInformation().update(term, "");
363
364         leader = new Leader(actorContext);
365
366         // Leader will send an immediate heartbeat - ignore it.
367         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368
369         // The follower would normally reply - simulate that explicitly here.
370         long lastIndex = actorContext.getReplicatedLog().lastIndex();
371         leader.handleMessage(followerActor, new AppendEntriesReply(
372                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374
375         followerActor.underlyingActor().clear();
376
377         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
378         leader.handleMessage(leaderActor, new SendHeartBeat());
379         sendReplicate(actorContext, lastIndex+1);
380
381         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
382         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
383
384         assertEquals(0, allMessages.get(0).getEntries().size());
385         assertEquals(1, allMessages.get(1).getEntries().size());
386     }
387
388
389     @Test
390     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
391         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
392
393         MockRaftActorContext actorContext = createActorContext();
394
395         leader = new Leader(actorContext);
396
397         actorContext.setLastApplied(0);
398
399         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
400         long term = actorContext.getTermInformation().getCurrentTerm();
401         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
402                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
403
404         actorContext.getReplicatedLog().append(newEntry);
405
406         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
407                 new Replicate(leaderActor, "state-id", newEntry));
408
409         // State should not change
410         assertTrue(raftBehavior instanceof Leader);
411
412         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
413
414         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
415         // one since lastApplied state is 0.
416         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
417                 leaderActor, ApplyState.class);
418         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
419
420         for(int i = 0; i <= newLogIndex - 1; i++ ) {
421             ApplyState applyState = applyStateList.get(i);
422             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
423             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
424         }
425
426         ApplyState last = applyStateList.get((int) newLogIndex - 1);
427         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
428         assertEquals("getIdentifier", "state-id", last.getIdentifier());
429     }
430
431     @Test
432     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
433         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
434
435         MockRaftActorContext actorContext = createActorContextWithFollower();
436
437         Map<String, String> leadersSnapshot = new HashMap<>();
438         leadersSnapshot.put("1", "A");
439         leadersSnapshot.put("2", "B");
440         leadersSnapshot.put("3", "C");
441
442         //clears leaders log
443         actorContext.getReplicatedLog().removeFrom(0);
444
445         final int followersLastIndex = 2;
446         final int snapshotIndex = 3;
447         final int newEntryIndex = 4;
448         final int snapshotTerm = 1;
449         final int currentTerm = 2;
450
451         // set the snapshot variables in replicatedlog
452         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
453         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
454         actorContext.setCommitIndex(followersLastIndex);
455         //set follower timeout to 2 mins, helps during debugging
456         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
457
458         leader = new Leader(actorContext);
459
460         // new entry
461         ReplicatedLogImplEntry entry =
462                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
463                         new MockRaftActorContext.MockPayload("D"));
464
465         //update follower timestamp
466         leader.markFollowerActive(FOLLOWER_ID);
467
468         ByteString bs = toByteString(leadersSnapshot);
469         leader.setSnapshot(Optional.of(bs));
470         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
471         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
472
473         //send first chunk and no InstallSnapshotReply received yet
474         fts.getNextChunk();
475         fts.incrementChunkIndex();
476
477         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
478                 TimeUnit.MILLISECONDS);
479
480         leader.handleMessage(leaderActor, new SendHeartBeat());
481
482         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483
484         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
485
486         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
487
488         //InstallSnapshotReply received
489         fts.markSendStatus(true);
490
491         leader.handleMessage(leaderActor, new SendHeartBeat());
492
493         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
494
495         assertEquals(snapshotIndex, is.getLastIncludedIndex());
496     }
497
498     @Test
499     public void testSendAppendEntriesSnapshotScenario() throws Exception {
500         logStart("testSendAppendEntriesSnapshotScenario");
501
502         MockRaftActorContext actorContext = createActorContextWithFollower();
503
504         Map<String, String> leadersSnapshot = new HashMap<>();
505         leadersSnapshot.put("1", "A");
506         leadersSnapshot.put("2", "B");
507         leadersSnapshot.put("3", "C");
508
509         //clears leaders log
510         actorContext.getReplicatedLog().removeFrom(0);
511
512         final int followersLastIndex = 2;
513         final int snapshotIndex = 3;
514         final int newEntryIndex = 4;
515         final int snapshotTerm = 1;
516         final int currentTerm = 2;
517
518         // set the snapshot variables in replicatedlog
519         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
520         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
521         actorContext.setCommitIndex(followersLastIndex);
522
523         leader = new Leader(actorContext);
524
525         // Leader will send an immediate heartbeat - ignore it.
526         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
527
528         // new entry
529         ReplicatedLogImplEntry entry =
530                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
531                         new MockRaftActorContext.MockPayload("D"));
532
533         actorContext.getReplicatedLog().append(entry);
534
535         //update follower timestamp
536         leader.markFollowerActive(FOLLOWER_ID);
537
538         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
539         RaftActorBehavior raftBehavior = leader.handleMessage(
540                 leaderActor, new Replicate(null, "state-id", entry));
541
542         assertTrue(raftBehavior instanceof Leader);
543
544         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
545     }
546
547     @Test
548     public void testInitiateInstallSnapshot() throws Exception {
549         logStart("testInitiateInstallSnapshot");
550
551         MockRaftActorContext actorContext = createActorContextWithFollower();
552
553         Map<String, String> leadersSnapshot = new HashMap<>();
554         leadersSnapshot.put("1", "A");
555         leadersSnapshot.put("2", "B");
556         leadersSnapshot.put("3", "C");
557
558         //clears leaders log
559         actorContext.getReplicatedLog().removeFrom(0);
560
561         final int followersLastIndex = 2;
562         final int snapshotIndex = 3;
563         final int newEntryIndex = 4;
564         final int snapshotTerm = 1;
565         final int currentTerm = 2;
566
567         // set the snapshot variables in replicatedlog
568         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570         actorContext.setLastApplied(3);
571         actorContext.setCommitIndex(followersLastIndex);
572
573         leader = new Leader(actorContext);
574
575         // Leader will send an immediate heartbeat - ignore it.
576         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
577
578         // set the snapshot as absent and check if capture-snapshot is invoked.
579         leader.setSnapshot(Optional.<ByteString>absent());
580
581         // new entry
582         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
583                 new MockRaftActorContext.MockPayload("D"));
584
585         actorContext.getReplicatedLog().append(entry);
586
587         //update follower timestamp
588         leader.markFollowerActive(FOLLOWER_ID);
589
590         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
591
592         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
593
594         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
595
596         assertTrue(cs.isInstallSnapshotInitiated());
597         assertEquals(3, cs.getLastAppliedIndex());
598         assertEquals(1, cs.getLastAppliedTerm());
599         assertEquals(4, cs.getLastIndex());
600         assertEquals(2, cs.getLastTerm());
601
602         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
603         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
604
605         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
606     }
607
608     @Test
609     public void testInstallSnapshot() throws Exception {
610         logStart("testInstallSnapshot");
611
612         MockRaftActorContext actorContext = createActorContextWithFollower();
613
614         Map<String, String> leadersSnapshot = new HashMap<>();
615         leadersSnapshot.put("1", "A");
616         leadersSnapshot.put("2", "B");
617         leadersSnapshot.put("3", "C");
618
619         //clears leaders log
620         actorContext.getReplicatedLog().removeFrom(0);
621
622         final int followersLastIndex = 2;
623         final int snapshotIndex = 3;
624         final int snapshotTerm = 1;
625         final int currentTerm = 2;
626
627         // set the snapshot variables in replicatedlog
628         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
629         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
630         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
631         actorContext.setCommitIndex(followersLastIndex);
632
633         leader = new Leader(actorContext);
634
635         // Ignore initial heartbeat.
636         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
637
638         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
639                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
640
641         assertTrue(raftBehavior instanceof Leader);
642
643         // check if installsnapshot gets called with the correct values.
644
645         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
646
647         assertNotNull(installSnapshot.getData());
648         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
649         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
650
651         assertEquals(currentTerm, installSnapshot.getTerm());
652     }
653
654     @Test
655     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
656         logStart("testHandleInstallSnapshotReplyLastChunk");
657
658         MockRaftActorContext actorContext = createActorContextWithFollower();
659
660         final int followersLastIndex = 2;
661         final int snapshotIndex = 3;
662         final int snapshotTerm = 1;
663         final int currentTerm = 2;
664
665         actorContext.setCommitIndex(followersLastIndex);
666
667         leader = new Leader(actorContext);
668
669         // Ignore initial heartbeat.
670         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
671
672         Map<String, String> leadersSnapshot = new HashMap<>();
673         leadersSnapshot.put("1", "A");
674         leadersSnapshot.put("2", "B");
675         leadersSnapshot.put("3", "C");
676
677         // set the snapshot variables in replicatedlog
678
679         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
682
683         ByteString bs = toByteString(leadersSnapshot);
684         leader.setSnapshot(Optional.of(bs));
685         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
686         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
687         while(!fts.isLastChunk(fts.getChunkIndex())) {
688             fts.getNextChunk();
689             fts.incrementChunkIndex();
690         }
691
692         //clears leaders log
693         actorContext.getReplicatedLog().removeFrom(0);
694
695         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
696                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
697
698         assertTrue(raftBehavior instanceof Leader);
699
700         assertEquals(0, leader.followerSnapshotSize());
701         assertEquals(1, leader.followerLogSize());
702         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
703         assertNotNull(fli);
704         assertEquals(snapshotIndex, fli.getMatchIndex());
705         assertEquals(snapshotIndex, fli.getMatchIndex());
706         assertEquals(snapshotIndex + 1, fli.getNextIndex());
707     }
708
709     @Test
710     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
711         logStart("testSendSnapshotfromInstallSnapshotReply");
712
713         MockRaftActorContext actorContext = createActorContextWithFollower();
714
715         final int followersLastIndex = 2;
716         final int snapshotIndex = 3;
717         final int snapshotTerm = 1;
718         final int currentTerm = 2;
719
720         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
721             @Override
722             public int getSnapshotChunkSize() {
723                 return 50;
724             }
725         };
726         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
727         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
728
729         actorContext.setConfigParams(configParams);
730         actorContext.setCommitIndex(followersLastIndex);
731
732         leader = new Leader(actorContext);
733
734         Map<String, String> leadersSnapshot = new HashMap<>();
735         leadersSnapshot.put("1", "A");
736         leadersSnapshot.put("2", "B");
737         leadersSnapshot.put("3", "C");
738
739         // set the snapshot variables in replicatedlog
740         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
741         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
742         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
743
744         ByteString bs = toByteString(leadersSnapshot);
745         leader.setSnapshot(Optional.of(bs));
746
747         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
748
749         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
750
751         assertEquals(1, installSnapshot.getChunkIndex());
752         assertEquals(3, installSnapshot.getTotalChunks());
753
754         followerActor.underlyingActor().clear();
755         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
756                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
757
758         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
759
760         assertEquals(2, installSnapshot.getChunkIndex());
761         assertEquals(3, installSnapshot.getTotalChunks());
762
763         followerActor.underlyingActor().clear();
764         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
765                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
766
767         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
768
769         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
770         followerActor.underlyingActor().clear();
771         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
772                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
773
774         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
775
776         Assert.assertNull(installSnapshot);
777     }
778
779
780     @Test
781     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
782         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
783
784         MockRaftActorContext actorContext = createActorContextWithFollower();
785
786         final int followersLastIndex = 2;
787         final int snapshotIndex = 3;
788         final int snapshotTerm = 1;
789         final int currentTerm = 2;
790
791         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
792             @Override
793             public int getSnapshotChunkSize() {
794                 return 50;
795             }
796         });
797
798         actorContext.setCommitIndex(followersLastIndex);
799
800         leader = new Leader(actorContext);
801
802         Map<String, String> leadersSnapshot = new HashMap<>();
803         leadersSnapshot.put("1", "A");
804         leadersSnapshot.put("2", "B");
805         leadersSnapshot.put("3", "C");
806
807         // set the snapshot variables in replicatedlog
808         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
809         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
810         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
811
812         ByteString bs = toByteString(leadersSnapshot);
813         leader.setSnapshot(Optional.of(bs));
814
815         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
816         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
817
818         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
819
820         assertEquals(1, installSnapshot.getChunkIndex());
821         assertEquals(3, installSnapshot.getTotalChunks());
822
823         followerActor.underlyingActor().clear();
824
825         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
826                 FOLLOWER_ID, -1, false));
827
828         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
829                 TimeUnit.MILLISECONDS);
830
831         leader.handleMessage(leaderActor, new SendHeartBeat());
832
833         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
834
835         assertEquals(1, installSnapshot.getChunkIndex());
836         assertEquals(3, installSnapshot.getTotalChunks());
837     }
838
839     @Test
840     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
841         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
842
843         MockRaftActorContext actorContext = createActorContextWithFollower();
844
845         final int followersLastIndex = 2;
846         final int snapshotIndex = 3;
847         final int snapshotTerm = 1;
848         final int currentTerm = 2;
849
850         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
851             @Override
852             public int getSnapshotChunkSize() {
853                 return 50;
854             }
855         });
856
857         actorContext.setCommitIndex(followersLastIndex);
858
859         leader = new Leader(actorContext);
860
861         Map<String, String> leadersSnapshot = new HashMap<>();
862         leadersSnapshot.put("1", "A");
863         leadersSnapshot.put("2", "B");
864         leadersSnapshot.put("3", "C");
865
866         // set the snapshot variables in replicatedlog
867         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
868         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
869         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
870
871         ByteString bs = toByteString(leadersSnapshot);
872         leader.setSnapshot(Optional.of(bs));
873
874         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
875
876         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877
878         assertEquals(1, installSnapshot.getChunkIndex());
879         assertEquals(3, installSnapshot.getTotalChunks());
880         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
881
882         int hashCode = installSnapshot.getData().hashCode();
883
884         followerActor.underlyingActor().clear();
885
886         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
887                 FOLLOWER_ID, 1, true));
888
889         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
890
891         assertEquals(2, installSnapshot.getChunkIndex());
892         assertEquals(3, installSnapshot.getTotalChunks());
893         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
894     }
895
896     @Test
897     public void testFollowerToSnapshotLogic() {
898         logStart("testFollowerToSnapshotLogic");
899
900         MockRaftActorContext actorContext = createActorContext();
901
902         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
903             @Override
904             public int getSnapshotChunkSize() {
905                 return 50;
906             }
907         });
908
909         leader = new Leader(actorContext);
910
911         Map<String, String> leadersSnapshot = new HashMap<>();
912         leadersSnapshot.put("1", "A");
913         leadersSnapshot.put("2", "B");
914         leadersSnapshot.put("3", "C");
915
916         ByteString bs = toByteString(leadersSnapshot);
917         byte[] barray = bs.toByteArray();
918
919         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
920         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
921
922         assertEquals(bs.size(), barray.length);
923
924         int chunkIndex=0;
925         for (int i=0; i < barray.length; i = i + 50) {
926             int j = i + 50;
927             chunkIndex++;
928
929             if (i + 50 > barray.length) {
930                 j = barray.length;
931             }
932
933             ByteString chunk = fts.getNextChunk();
934             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
935             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
936
937             fts.markSendStatus(true);
938             if (!fts.isLastChunk(chunkIndex)) {
939                 fts.incrementChunkIndex();
940             }
941         }
942
943         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
944     }
945
946     @Override protected RaftActorBehavior createBehavior(
947         RaftActorContext actorContext) {
948         return new Leader(actorContext);
949     }
950
951     @Override
952     protected MockRaftActorContext createActorContext() {
953         return createActorContext(leaderActor);
954     }
955
956     @Override
957     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
958         return createActorContext(LEADER_ID, actorRef);
959     }
960
961     private MockRaftActorContext createActorContextWithFollower() {
962         MockRaftActorContext actorContext = createActorContext();
963         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
964                 followerActor.path().toString()).build());
965         return actorContext;
966     }
967
968     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
969         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
970         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
971         configParams.setElectionTimeoutFactor(100000);
972         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
973         context.setConfigParams(configParams);
974         return context;
975     }
976
977     private MockRaftActorContext createFollowerActorContextWithLeader() {
978         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
979         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
980         followerConfig.setElectionTimeoutFactor(10000);
981         followerActorContext.setConfigParams(followerConfig);
982         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
983         return followerActorContext;
984     }
985
986     @Test
987     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
988         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
989
990         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
991
992         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
993
994         Follower follower = new Follower(followerActorContext);
995         followerActor.underlyingActor().setBehavior(follower);
996
997         Map<String, String> peerAddresses = new HashMap<>();
998         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
999
1000         leaderActorContext.setPeerAddresses(peerAddresses);
1001
1002         leaderActorContext.getReplicatedLog().removeFrom(0);
1003
1004         //create 3 entries
1005         leaderActorContext.setReplicatedLog(
1006                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1007
1008         leaderActorContext.setCommitIndex(1);
1009
1010         followerActorContext.getReplicatedLog().removeFrom(0);
1011
1012         // follower too has the exact same log entries and has the same commit index
1013         followerActorContext.setReplicatedLog(
1014                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1015
1016         followerActorContext.setCommitIndex(1);
1017
1018         leader = new Leader(leaderActorContext);
1019
1020         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1021
1022         assertEquals(1, appendEntries.getLeaderCommit());
1023         assertEquals(0, appendEntries.getEntries().size());
1024         assertEquals(0, appendEntries.getPrevLogIndex());
1025
1026         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1027                 leaderActor, AppendEntriesReply.class);
1028
1029         assertEquals(2, appendEntriesReply.getLogLastIndex());
1030         assertEquals(1, appendEntriesReply.getLogLastTerm());
1031
1032         // follower returns its next index
1033         assertEquals(2, appendEntriesReply.getLogLastIndex());
1034         assertEquals(1, appendEntriesReply.getLogLastTerm());
1035
1036         follower.close();
1037     }
1038
1039     @Test
1040     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1041         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1042
1043         MockRaftActorContext leaderActorContext = createActorContext();
1044
1045         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1046         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1047
1048         Follower follower = new Follower(followerActorContext);
1049         followerActor.underlyingActor().setBehavior(follower);
1050
1051         Map<String, String> leaderPeerAddresses = new HashMap<>();
1052         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1053
1054         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1055
1056         leaderActorContext.getReplicatedLog().removeFrom(0);
1057
1058         leaderActorContext.setReplicatedLog(
1059                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1060
1061         leaderActorContext.setCommitIndex(1);
1062
1063         followerActorContext.getReplicatedLog().removeFrom(0);
1064
1065         followerActorContext.setReplicatedLog(
1066                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1067
1068         // follower has the same log entries but its commit index > leaders commit index
1069         followerActorContext.setCommitIndex(2);
1070
1071         leader = new Leader(leaderActorContext);
1072
1073         // Initial heartbeat
1074         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1075
1076         assertEquals(1, appendEntries.getLeaderCommit());
1077         assertEquals(0, appendEntries.getEntries().size());
1078         assertEquals(0, appendEntries.getPrevLogIndex());
1079
1080         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1081                 leaderActor, AppendEntriesReply.class);
1082
1083         assertEquals(2, appendEntriesReply.getLogLastIndex());
1084         assertEquals(1, appendEntriesReply.getLogLastTerm());
1085
1086         leaderActor.underlyingActor().setBehavior(follower);
1087         leader.handleMessage(followerActor, appendEntriesReply);
1088
1089         leaderActor.underlyingActor().clear();
1090         followerActor.underlyingActor().clear();
1091
1092         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1093                 TimeUnit.MILLISECONDS);
1094
1095         leader.handleMessage(leaderActor, new SendHeartBeat());
1096
1097         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1098
1099         assertEquals(2, appendEntries.getLeaderCommit());
1100         assertEquals(0, appendEntries.getEntries().size());
1101         assertEquals(2, appendEntries.getPrevLogIndex());
1102
1103         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1104
1105         assertEquals(2, appendEntriesReply.getLogLastIndex());
1106         assertEquals(1, appendEntriesReply.getLogLastTerm());
1107
1108         assertEquals(2, followerActorContext.getCommitIndex());
1109
1110         follower.close();
1111     }
1112
1113     @Test
1114     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1115         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1116
1117         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1118         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1119                 new FiniteDuration(1000, TimeUnit.SECONDS));
1120
1121         leaderActorContext.setReplicatedLog(
1122                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1123         long leaderCommitIndex = 2;
1124         leaderActorContext.setCommitIndex(leaderCommitIndex);
1125         leaderActorContext.setLastApplied(leaderCommitIndex);
1126
1127         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1128
1129         followerActorContext.setReplicatedLog(
1130                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1131         followerActorContext.setCommitIndex(1);
1132         followerActorContext.setLastApplied(1);
1133
1134         Follower follower = new Follower(followerActorContext);
1135         followerActor.underlyingActor().setBehavior(follower);
1136
1137         leader = new Leader(leaderActorContext);
1138
1139         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1140         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1141
1142         MessageCollectorActor.clearMessages(followerActor);
1143         MessageCollectorActor.clearMessages(leaderActor);
1144
1145         // Verify initial AppendEntries sent with the leader's current commit index.
1146         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1147         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1148         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1149
1150         leaderActor.underlyingActor().setBehavior(leader);
1151
1152         leader.handleMessage(followerActor, appendEntriesReply);
1153
1154         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1155         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1156
1157         // Verify AppendEntries sent with the leader's second log entry.
1158         appendEntries = appendEntriesList.get(0);
1159         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1160         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1161         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1162         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1163
1164         // Verify AppendEntries sent with the leader's third log entry.
1165         appendEntries = appendEntriesList.get(1);
1166         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1167         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1168         assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
1169         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1170
1171         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1172         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1173
1174         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1175         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1176     }
1177
1178     @Test
1179     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1180         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1181
1182         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1183         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1184                 new FiniteDuration(1000, TimeUnit.SECONDS));
1185
1186         leaderActorContext.setReplicatedLog(
1187                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1188         long leaderCommitIndex = 1;
1189         leaderActorContext.setCommitIndex(leaderCommitIndex);
1190         leaderActorContext.setLastApplied(leaderCommitIndex);
1191
1192         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1193
1194         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1195         followerActorContext.setCommitIndex(-1);
1196         followerActorContext.setLastApplied(-1);
1197
1198         Follower follower = new Follower(followerActorContext);
1199         followerActor.underlyingActor().setBehavior(follower);
1200
1201         leader = new Leader(leaderActorContext);
1202
1203         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1204         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1205
1206         MessageCollectorActor.clearMessages(followerActor);
1207         MessageCollectorActor.clearMessages(leaderActor);
1208
1209         // Verify initial AppendEntries sent with the leader's current commit index.
1210         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1211         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1212         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1213
1214         leaderActor.underlyingActor().setBehavior(leader);
1215
1216         leader.handleMessage(followerActor, appendEntriesReply);
1217
1218         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1219         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1220
1221         // Verify AppendEntries sent with the leader's first log entry.
1222         appendEntries = appendEntriesList.get(0);
1223         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1224         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1225         assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
1226         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1227
1228         // Verify AppendEntries sent with the leader's second log entry.
1229         appendEntries = appendEntriesList.get(1);
1230         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1231         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1232         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1233         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1234
1235         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1236         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1237
1238         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1239         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1240     }
1241
1242     @Test
1243     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1244         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1245
1246         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1247         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1248                 new FiniteDuration(1000, TimeUnit.SECONDS));
1249
1250         leaderActorContext.setReplicatedLog(
1251                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1252         long leaderCommitIndex = 1;
1253         leaderActorContext.setCommitIndex(leaderCommitIndex);
1254         leaderActorContext.setLastApplied(leaderCommitIndex);
1255
1256         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1257         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1258
1259         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1260
1261         followerActorContext.setReplicatedLog(
1262                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1263         followerActorContext.setCommitIndex(-1);
1264         followerActorContext.setLastApplied(-1);
1265
1266         Follower follower = new Follower(followerActorContext);
1267         followerActor.underlyingActor().setBehavior(follower);
1268
1269         leader = new Leader(leaderActorContext);
1270
1271         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1272         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1273
1274         MessageCollectorActor.clearMessages(followerActor);
1275         MessageCollectorActor.clearMessages(leaderActor);
1276
1277         // Verify initial AppendEntries sent with the leader's current commit index.
1278         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1279         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1280         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1281
1282         leaderActor.underlyingActor().setBehavior(leader);
1283
1284         leader.handleMessage(followerActor, appendEntriesReply);
1285
1286         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1287         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1288
1289         // Verify AppendEntries sent with the leader's first log entry.
1290         appendEntries = appendEntriesList.get(0);
1291         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1292         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1293         assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
1294         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1295
1296         // Verify AppendEntries sent with the leader's third log entry.
1297         appendEntries = appendEntriesList.get(1);
1298         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1299         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1300         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1301         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1302
1303         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1304         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1305
1306         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1307
1308         ApplyState applyState = applyStateList.get(0);
1309         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1310         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1311         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1312                 applyState.getReplicatedLogEntry().getData());
1313
1314         applyState = applyStateList.get(1);
1315         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1316         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1317         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1318                 applyState.getReplicatedLogEntry().getData());
1319
1320         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1321         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1322         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1323     }
1324
1325     @Test
1326     public void testHandleAppendEntriesReplySuccess() throws Exception {
1327         logStart("testHandleAppendEntriesReplySuccess");
1328
1329         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1330
1331         leaderActorContext.setReplicatedLog(
1332                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1333
1334         leaderActorContext.setCommitIndex(1);
1335         leaderActorContext.setLastApplied(1);
1336         leaderActorContext.getTermInformation().update(1, "leader");
1337
1338         leader = new Leader(leaderActorContext);
1339
1340         short payloadVersion = 5;
1341         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1342
1343         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1344
1345         assertEquals(RaftState.Leader, raftActorBehavior.state());
1346
1347         assertEquals(2, leaderActorContext.getCommitIndex());
1348
1349         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1350                 leaderActor, ApplyJournalEntries.class);
1351
1352         assertEquals(2, leaderActorContext.getLastApplied());
1353
1354         assertEquals(2, applyJournalEntries.getToIndex());
1355
1356         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1357                 ApplyState.class);
1358
1359         assertEquals(1,applyStateList.size());
1360
1361         ApplyState applyState = applyStateList.get(0);
1362
1363         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1364
1365         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1366         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1367     }
1368
1369     @Test
1370     public void testHandleAppendEntriesReplyUnknownFollower(){
1371         logStart("testHandleAppendEntriesReplyUnknownFollower");
1372
1373         MockRaftActorContext leaderActorContext = createActorContext();
1374
1375         leader = new Leader(leaderActorContext);
1376
1377         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1378
1379         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1380
1381         assertEquals(RaftState.Leader, raftActorBehavior.state());
1382     }
1383
1384     @Test
1385     public void testHandleRequestVoteReply(){
1386         logStart("testHandleRequestVoteReply");
1387
1388         MockRaftActorContext leaderActorContext = createActorContext();
1389
1390         leader = new Leader(leaderActorContext);
1391
1392         // Should be a no-op.
1393         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1394                 new RequestVoteReply(1, true));
1395
1396         assertEquals(RaftState.Leader, raftActorBehavior.state());
1397
1398         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1399
1400         assertEquals(RaftState.Leader, raftActorBehavior.state());
1401     }
1402
1403     @Test
1404     public void testIsolatedLeaderCheckNoFollowers() {
1405         logStart("testIsolatedLeaderCheckNoFollowers");
1406
1407         MockRaftActorContext leaderActorContext = createActorContext();
1408
1409         leader = new Leader(leaderActorContext);
1410         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1411         Assert.assertTrue(behavior instanceof Leader);
1412     }
1413
1414     @Test
1415     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1416         logStart("testIsolatedLeaderCheckTwoFollowers");
1417
1418         new JavaTestKit(getSystem()) {{
1419
1420             ActorRef followerActor1 = getTestActor();
1421             ActorRef followerActor2 = getTestActor();
1422
1423             MockRaftActorContext leaderActorContext = createActorContext();
1424
1425             Map<String, String> peerAddresses = new HashMap<>();
1426             peerAddresses.put("follower-1", followerActor1.path().toString());
1427             peerAddresses.put("follower-2", followerActor2.path().toString());
1428
1429             leaderActorContext.setPeerAddresses(peerAddresses);
1430
1431             leader = new Leader(leaderActorContext);
1432
1433             leader.markFollowerActive("follower-1");
1434             leader.markFollowerActive("follower-2");
1435             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1436             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1437                 behavior instanceof Leader);
1438
1439             // kill 1 follower and verify if that got killed
1440             final JavaTestKit probe = new JavaTestKit(getSystem());
1441             probe.watch(followerActor1);
1442             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1443             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1444             assertEquals(termMsg1.getActor(), followerActor1);
1445
1446             leader.markFollowerInActive("follower-1");
1447             leader.markFollowerActive("follower-2");
1448             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1449             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1450                 behavior instanceof Leader);
1451
1452             // kill 2nd follower and leader should change to Isolated leader
1453             followerActor2.tell(PoisonPill.getInstance(), null);
1454             probe.watch(followerActor2);
1455             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1456             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1457             assertEquals(termMsg2.getActor(), followerActor2);
1458
1459             leader.markFollowerInActive("follower-2");
1460             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1461             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1462                 behavior instanceof IsolatedLeader);
1463         }};
1464     }
1465
1466
1467     @Test
1468     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1469         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1470
1471         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1472
1473         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1474         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1475         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1476
1477         leaderActorContext.setConfigParams(configParams);
1478
1479         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1480
1481         followerActorContext.setConfigParams(configParams);
1482         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1483
1484         Follower follower = new Follower(followerActorContext);
1485         followerActor.underlyingActor().setBehavior(follower);
1486
1487         leaderActorContext.getReplicatedLog().removeFrom(0);
1488         leaderActorContext.setCommitIndex(-1);
1489         leaderActorContext.setLastApplied(-1);
1490
1491         followerActorContext.getReplicatedLog().removeFrom(0);
1492         followerActorContext.setCommitIndex(-1);
1493         followerActorContext.setLastApplied(-1);
1494
1495         leader = new Leader(leaderActorContext);
1496
1497         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1498                 leaderActor, AppendEntriesReply.class);
1499
1500         leader.handleMessage(followerActor, appendEntriesReply);
1501
1502         // Clear initial heartbeat messages
1503
1504         leaderActor.underlyingActor().clear();
1505         followerActor.underlyingActor().clear();
1506
1507         // create 3 entries
1508         leaderActorContext.setReplicatedLog(
1509                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1510         leaderActorContext.setCommitIndex(1);
1511         leaderActorContext.setLastApplied(1);
1512
1513         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1514                 TimeUnit.MILLISECONDS);
1515
1516         leader.handleMessage(leaderActor, new SendHeartBeat());
1517
1518         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1519
1520         // Should send first log entry
1521         assertEquals(1, appendEntries.getLeaderCommit());
1522         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1523         assertEquals(-1, appendEntries.getPrevLogIndex());
1524
1525         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1526
1527         assertEquals(1, appendEntriesReply.getLogLastTerm());
1528         assertEquals(0, appendEntriesReply.getLogLastIndex());
1529
1530         followerActor.underlyingActor().clear();
1531
1532         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1533
1534         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1535
1536         // Should send second log entry
1537         assertEquals(1, appendEntries.getLeaderCommit());
1538         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1539
1540         follower.close();
1541     }
1542
1543     @Test
1544     public void testLaggingFollowerStarvation() throws Exception {
1545         logStart("testLaggingFollowerStarvation");
1546         new JavaTestKit(getSystem()) {{
1547             String leaderActorId = actorFactory.generateActorId("leader");
1548             String follower1ActorId = actorFactory.generateActorId("follower");
1549             String follower2ActorId = actorFactory.generateActorId("follower");
1550
1551             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1552                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1553             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1554             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1555
1556             MockRaftActorContext leaderActorContext =
1557                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1558
1559             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1560             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1561             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1562
1563             leaderActorContext.setConfigParams(configParams);
1564
1565             leaderActorContext.setReplicatedLog(
1566                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1567
1568             Map<String, String> peerAddresses = new HashMap<>();
1569             peerAddresses.put(follower1ActorId,
1570                     follower1Actor.path().toString());
1571             peerAddresses.put(follower2ActorId,
1572                     follower2Actor.path().toString());
1573
1574             leaderActorContext.setPeerAddresses(peerAddresses);
1575             leaderActorContext.getTermInformation().update(1, leaderActorId);
1576
1577             RaftActorBehavior leader = createBehavior(leaderActorContext);
1578
1579             leaderActor.underlyingActor().setBehavior(leader);
1580
1581             for(int i=1;i<6;i++) {
1582                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1583                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1584                 assertTrue(newBehavior == leader);
1585                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1586             }
1587
1588             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1589             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1590
1591             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1592                     heartbeats.size() > 1);
1593
1594             // Check if follower-2 got AppendEntries during this time and was not starved
1595             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1596
1597             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1598                     appendEntries.size() > 1);
1599
1600         }};
1601     }
1602
1603     @Override
1604     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1605             ActorRef actorRef, RaftRPC rpc) throws Exception {
1606         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1607         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1608     }
1609
1610     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1611
1612         private final long electionTimeOutIntervalMillis;
1613         private final int snapshotChunkSize;
1614
1615         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1616             super();
1617             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1618             this.snapshotChunkSize = snapshotChunkSize;
1619         }
1620
1621         @Override
1622         public FiniteDuration getElectionTimeOutInterval() {
1623             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1624         }
1625
1626         @Override
1627         public int getSnapshotChunkSize() {
1628             return snapshotChunkSize;
1629         }
1630     }
1631 }