Merge "Re-enable running of SingleFeaturesTest"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractLeaderTest {
50
51     static final String FOLLOWER_ID = "follower";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123     @Test
124     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
125         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
126
127         MockRaftActorContext actorContext = createActorContextWithFollower();
128
129         long term = 1;
130         actorContext.getTermInformation().update(term, "");
131
132         leader = new Leader(actorContext);
133
134         // Leader will send an immediate heartbeat - ignore it.
135         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
136
137         // The follower would normally reply - simulate that explicitly here.
138         long lastIndex = actorContext.getReplicatedLog().lastIndex();
139         leader.handleMessage(followerActor, new AppendEntriesReply(
140                 FOLLOWER_ID, term, true, lastIndex, term));
141         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142
143         followerActor.underlyingActor().clear();
144
145         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
146         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
147                 1, lastIndex + 1, payload);
148         actorContext.getReplicatedLog().append(newEntry);
149         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
150                 new Replicate(null, null, newEntry));
151
152         // State should not change
153         assertTrue(raftBehavior instanceof Leader);
154
155         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
156         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
157         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
158         assertEquals("Entries size", 1, appendEntries.getEntries().size());
159         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
160         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
161         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
162     }
163
164     @Test
165     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
166         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
167
168         MockRaftActorContext actorContext = createActorContext();
169
170         leader = new Leader(actorContext);
171
172         actorContext.setLastApplied(0);
173
174         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
175         long term = actorContext.getTermInformation().getCurrentTerm();
176         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
177                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
178
179         actorContext.getReplicatedLog().append(newEntry);
180
181         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
182                 new Replicate(leaderActor, "state-id", newEntry));
183
184         // State should not change
185         assertTrue(raftBehavior instanceof Leader);
186
187         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
188
189         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
190         // one since lastApplied state is 0.
191         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
192                 leaderActor, ApplyState.class);
193         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
194
195         for(int i = 0; i <= newLogIndex - 1; i++ ) {
196             ApplyState applyState = applyStateList.get(i);
197             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
198             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
199         }
200
201         ApplyState last = applyStateList.get((int) newLogIndex - 1);
202         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
203         assertEquals("getIdentifier", "state-id", last.getIdentifier());
204     }
205
206     @Test
207     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
208         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
209
210         MockRaftActorContext actorContext = createActorContextWithFollower();
211
212         Map<String, String> leadersSnapshot = new HashMap<>();
213         leadersSnapshot.put("1", "A");
214         leadersSnapshot.put("2", "B");
215         leadersSnapshot.put("3", "C");
216
217         //clears leaders log
218         actorContext.getReplicatedLog().removeFrom(0);
219
220         final int followersLastIndex = 2;
221         final int snapshotIndex = 3;
222         final int newEntryIndex = 4;
223         final int snapshotTerm = 1;
224         final int currentTerm = 2;
225
226         // set the snapshot variables in replicatedlog
227         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
228         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
229         actorContext.setCommitIndex(followersLastIndex);
230         //set follower timeout to 2 mins, helps during debugging
231         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
232
233         leader = new Leader(actorContext);
234
235         // new entry
236         ReplicatedLogImplEntry entry =
237                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
238                         new MockRaftActorContext.MockPayload("D"));
239
240         //update follower timestamp
241         leader.markFollowerActive(FOLLOWER_ID);
242
243         ByteString bs = toByteString(leadersSnapshot);
244         leader.setSnapshot(Optional.of(bs));
245         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
246         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
247
248         //send first chunk and no InstallSnapshotReply received yet
249         fts.getNextChunk();
250         fts.incrementChunkIndex();
251
252         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
253                 TimeUnit.MILLISECONDS);
254
255         leader.handleMessage(leaderActor, new SendHeartBeat());
256
257         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
258
259         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
260
261         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
262
263         //InstallSnapshotReply received
264         fts.markSendStatus(true);
265
266         leader.handleMessage(leaderActor, new SendHeartBeat());
267
268         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
269                 InstallSnapshot.SERIALIZABLE_CLASS);
270
271         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
272
273         assertEquals(snapshotIndex, is.getLastIncludedIndex());
274     }
275
276     @Test
277     public void testSendAppendEntriesSnapshotScenario() throws Exception {
278         logStart("testSendAppendEntriesSnapshotScenario");
279
280         MockRaftActorContext actorContext = createActorContextWithFollower();
281
282         Map<String, String> leadersSnapshot = new HashMap<>();
283         leadersSnapshot.put("1", "A");
284         leadersSnapshot.put("2", "B");
285         leadersSnapshot.put("3", "C");
286
287         //clears leaders log
288         actorContext.getReplicatedLog().removeFrom(0);
289
290         final int followersLastIndex = 2;
291         final int snapshotIndex = 3;
292         final int newEntryIndex = 4;
293         final int snapshotTerm = 1;
294         final int currentTerm = 2;
295
296         // set the snapshot variables in replicatedlog
297         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
298         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
299         actorContext.setCommitIndex(followersLastIndex);
300
301         leader = new Leader(actorContext);
302
303         // Leader will send an immediate heartbeat - ignore it.
304         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
305
306         // new entry
307         ReplicatedLogImplEntry entry =
308                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
309                         new MockRaftActorContext.MockPayload("D"));
310
311         //update follower timestamp
312         leader.markFollowerActive(FOLLOWER_ID);
313
314         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
315         RaftActorBehavior raftBehavior = leader.handleMessage(
316                 leaderActor, new Replicate(null, "state-id", entry));
317
318         assertTrue(raftBehavior instanceof Leader);
319
320         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
321     }
322
323     @Test
324     public void testInitiateInstallSnapshot() throws Exception {
325         logStart("testInitiateInstallSnapshot");
326
327         MockRaftActorContext actorContext = createActorContextWithFollower();
328
329         Map<String, String> leadersSnapshot = new HashMap<>();
330         leadersSnapshot.put("1", "A");
331         leadersSnapshot.put("2", "B");
332         leadersSnapshot.put("3", "C");
333
334         //clears leaders log
335         actorContext.getReplicatedLog().removeFrom(0);
336
337         final int followersLastIndex = 2;
338         final int snapshotIndex = 3;
339         final int newEntryIndex = 4;
340         final int snapshotTerm = 1;
341         final int currentTerm = 2;
342
343         // set the snapshot variables in replicatedlog
344         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
345         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
346         actorContext.setLastApplied(3);
347         actorContext.setCommitIndex(followersLastIndex);
348
349         leader = new Leader(actorContext);
350
351         // Leader will send an immediate heartbeat - ignore it.
352         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
353
354         // set the snapshot as absent and check if capture-snapshot is invoked.
355         leader.setSnapshot(Optional.<ByteString>absent());
356
357         // new entry
358         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
359                 new MockRaftActorContext.MockPayload("D"));
360
361         actorContext.getReplicatedLog().append(entry);
362
363         //update follower timestamp
364         leader.markFollowerActive(FOLLOWER_ID);
365
366         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
367
368         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
369
370         assertTrue(cs.isInstallSnapshotInitiated());
371         assertEquals(3, cs.getLastAppliedIndex());
372         assertEquals(1, cs.getLastAppliedTerm());
373         assertEquals(4, cs.getLastIndex());
374         assertEquals(2, cs.getLastTerm());
375
376         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
377         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
378
379         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
380         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
381     }
382
383     @Test
384     public void testInstallSnapshot() throws Exception {
385         logStart("testInstallSnapshot");
386
387         MockRaftActorContext actorContext = createActorContextWithFollower();
388
389         Map<String, String> leadersSnapshot = new HashMap<>();
390         leadersSnapshot.put("1", "A");
391         leadersSnapshot.put("2", "B");
392         leadersSnapshot.put("3", "C");
393
394         //clears leaders log
395         actorContext.getReplicatedLog().removeFrom(0);
396
397         final int followersLastIndex = 2;
398         final int snapshotIndex = 3;
399         final int snapshotTerm = 1;
400         final int currentTerm = 2;
401
402         // set the snapshot variables in replicatedlog
403         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
404         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
405         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
406         actorContext.setCommitIndex(followersLastIndex);
407
408         leader = new Leader(actorContext);
409
410         // Ignore initial heartbeat.
411         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
412
413         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
414                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
415
416         assertTrue(raftBehavior instanceof Leader);
417
418         // check if installsnapshot gets called with the correct values.
419
420         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
421                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
422
423         assertNotNull(installSnapshot.getData());
424         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
425         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
426
427         assertEquals(currentTerm, installSnapshot.getTerm());
428     }
429
430     @Test
431     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
432         logStart("testHandleInstallSnapshotReplyLastChunk");
433
434         MockRaftActorContext actorContext = createActorContextWithFollower();
435
436         final int followersLastIndex = 2;
437         final int snapshotIndex = 3;
438         final int snapshotTerm = 1;
439         final int currentTerm = 2;
440
441         actorContext.setCommitIndex(followersLastIndex);
442
443         leader = new Leader(actorContext);
444
445         // Ignore initial heartbeat.
446         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
447
448         Map<String, String> leadersSnapshot = new HashMap<>();
449         leadersSnapshot.put("1", "A");
450         leadersSnapshot.put("2", "B");
451         leadersSnapshot.put("3", "C");
452
453         // set the snapshot variables in replicatedlog
454
455         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
456         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
457         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
458
459         ByteString bs = toByteString(leadersSnapshot);
460         leader.setSnapshot(Optional.of(bs));
461         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
462         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
463         while(!fts.isLastChunk(fts.getChunkIndex())) {
464             fts.getNextChunk();
465             fts.incrementChunkIndex();
466         }
467
468         //clears leaders log
469         actorContext.getReplicatedLog().removeFrom(0);
470
471         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
472                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
473
474         assertTrue(raftBehavior instanceof Leader);
475
476         assertEquals(0, leader.followerSnapshotSize());
477         assertEquals(1, leader.followerLogSize());
478         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
479         assertNotNull(fli);
480         assertEquals(snapshotIndex, fli.getMatchIndex());
481         assertEquals(snapshotIndex, fli.getMatchIndex());
482         assertEquals(snapshotIndex + 1, fli.getNextIndex());
483     }
484
485     @Test
486     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
487         logStart("testSendSnapshotfromInstallSnapshotReply");
488
489         MockRaftActorContext actorContext = createActorContextWithFollower();
490
491         final int followersLastIndex = 2;
492         final int snapshotIndex = 3;
493         final int snapshotTerm = 1;
494         final int currentTerm = 2;
495
496         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
497             @Override
498             public int getSnapshotChunkSize() {
499                 return 50;
500             }
501         };
502         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
503         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
504
505         actorContext.setConfigParams(configParams);
506         actorContext.setCommitIndex(followersLastIndex);
507
508         leader = new Leader(actorContext);
509
510         Map<String, String> leadersSnapshot = new HashMap<>();
511         leadersSnapshot.put("1", "A");
512         leadersSnapshot.put("2", "B");
513         leadersSnapshot.put("3", "C");
514
515         // set the snapshot variables in replicatedlog
516         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
517         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
518         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
519
520         ByteString bs = toByteString(leadersSnapshot);
521         leader.setSnapshot(Optional.of(bs));
522
523         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
524
525         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
526                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
527
528         assertEquals(1, installSnapshot.getChunkIndex());
529         assertEquals(3, installSnapshot.getTotalChunks());
530
531         followerActor.underlyingActor().clear();
532         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
533                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
534
535         installSnapshot = MessageCollectorActor.expectFirstMatching(
536                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
537
538         assertEquals(2, installSnapshot.getChunkIndex());
539         assertEquals(3, installSnapshot.getTotalChunks());
540
541         followerActor.underlyingActor().clear();
542         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
543                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
544
545         installSnapshot = MessageCollectorActor.expectFirstMatching(
546                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
547
548         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
549         followerActor.underlyingActor().clear();
550         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
551                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
552
553         installSnapshot = MessageCollectorActor.getFirstMatching(
554                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
555
556         Assert.assertNull(installSnapshot);
557     }
558
559
560     @Test
561     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
562         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
563
564         MockRaftActorContext actorContext = createActorContextWithFollower();
565
566         final int followersLastIndex = 2;
567         final int snapshotIndex = 3;
568         final int snapshotTerm = 1;
569         final int currentTerm = 2;
570
571         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
572             @Override
573             public int getSnapshotChunkSize() {
574                 return 50;
575             }
576         });
577
578         actorContext.setCommitIndex(followersLastIndex);
579
580         leader = new Leader(actorContext);
581
582         Map<String, String> leadersSnapshot = new HashMap<>();
583         leadersSnapshot.put("1", "A");
584         leadersSnapshot.put("2", "B");
585         leadersSnapshot.put("3", "C");
586
587         // set the snapshot variables in replicatedlog
588         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
589         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
590         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
591
592         ByteString bs = toByteString(leadersSnapshot);
593         leader.setSnapshot(Optional.of(bs));
594
595         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
596
597         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
598                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
599
600         assertEquals(1, installSnapshot.getChunkIndex());
601         assertEquals(3, installSnapshot.getTotalChunks());
602
603         followerActor.underlyingActor().clear();
604
605         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
606                 FOLLOWER_ID, -1, false));
607
608         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
609                 TimeUnit.MILLISECONDS);
610
611         leader.handleMessage(leaderActor, new SendHeartBeat());
612
613         installSnapshot = MessageCollectorActor.expectFirstMatching(
614                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
615
616         assertEquals(1, installSnapshot.getChunkIndex());
617         assertEquals(3, installSnapshot.getTotalChunks());
618     }
619
620     @Test
621     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
622         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
623
624         MockRaftActorContext actorContext = createActorContextWithFollower();
625
626         final int followersLastIndex = 2;
627         final int snapshotIndex = 3;
628         final int snapshotTerm = 1;
629         final int currentTerm = 2;
630
631         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
632             @Override
633             public int getSnapshotChunkSize() {
634                 return 50;
635             }
636         });
637
638         actorContext.setCommitIndex(followersLastIndex);
639
640         leader = new Leader(actorContext);
641
642         Map<String, String> leadersSnapshot = new HashMap<>();
643         leadersSnapshot.put("1", "A");
644         leadersSnapshot.put("2", "B");
645         leadersSnapshot.put("3", "C");
646
647         // set the snapshot variables in replicatedlog
648         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
649         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
650         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
651
652         ByteString bs = toByteString(leadersSnapshot);
653         leader.setSnapshot(Optional.of(bs));
654
655         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
656
657         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
658                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
659
660         assertEquals(1, installSnapshot.getChunkIndex());
661         assertEquals(3, installSnapshot.getTotalChunks());
662         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
663
664         int hashCode = installSnapshot.getData().hashCode();
665
666         followerActor.underlyingActor().clear();
667
668         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
669                 FOLLOWER_ID, 1, true));
670
671         installSnapshot = MessageCollectorActor.expectFirstMatching(
672                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
673
674         assertEquals(2, installSnapshot.getChunkIndex());
675         assertEquals(3, installSnapshot.getTotalChunks());
676         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
677     }
678
679     @Test
680     public void testFollowerToSnapshotLogic() {
681         logStart("testFollowerToSnapshotLogic");
682
683         MockRaftActorContext actorContext = createActorContext();
684
685         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
686             @Override
687             public int getSnapshotChunkSize() {
688                 return 50;
689             }
690         });
691
692         leader = new Leader(actorContext);
693
694         Map<String, String> leadersSnapshot = new HashMap<>();
695         leadersSnapshot.put("1", "A");
696         leadersSnapshot.put("2", "B");
697         leadersSnapshot.put("3", "C");
698
699         ByteString bs = toByteString(leadersSnapshot);
700         byte[] barray = bs.toByteArray();
701
702         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
703         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
704
705         assertEquals(bs.size(), barray.length);
706
707         int chunkIndex=0;
708         for (int i=0; i < barray.length; i = i + 50) {
709             int j = i + 50;
710             chunkIndex++;
711
712             if (i + 50 > barray.length) {
713                 j = barray.length;
714             }
715
716             ByteString chunk = fts.getNextChunk();
717             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
718             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
719
720             fts.markSendStatus(true);
721             if (!fts.isLastChunk(chunkIndex)) {
722                 fts.incrementChunkIndex();
723             }
724         }
725
726         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
727     }
728
729     @Override protected RaftActorBehavior createBehavior(
730         RaftActorContext actorContext) {
731         return new Leader(actorContext);
732     }
733
734     @Override
735     protected MockRaftActorContext createActorContext() {
736         return createActorContext(leaderActor);
737     }
738
739     @Override
740     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
741         return createActorContext("leader", actorRef);
742     }
743
744     private MockRaftActorContext createActorContextWithFollower() {
745         MockRaftActorContext actorContext = createActorContext();
746         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
747                 followerActor.path().toString()).build());
748         return actorContext;
749     }
750
751     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
752         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
753         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
754         configParams.setElectionTimeoutFactor(100000);
755         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
756         context.setConfigParams(configParams);
757         return context;
758     }
759
760     @Test
761     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
762         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
763
764         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
765
766         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
767
768         Follower follower = new Follower(followerActorContext);
769         followerActor.underlyingActor().setBehavior(follower);
770
771         Map<String, String> peerAddresses = new HashMap<>();
772         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
773
774         leaderActorContext.setPeerAddresses(peerAddresses);
775
776         leaderActorContext.getReplicatedLog().removeFrom(0);
777
778         //create 3 entries
779         leaderActorContext.setReplicatedLog(
780                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
781
782         leaderActorContext.setCommitIndex(1);
783
784         followerActorContext.getReplicatedLog().removeFrom(0);
785
786         // follower too has the exact same log entries and has the same commit index
787         followerActorContext.setReplicatedLog(
788                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
789
790         followerActorContext.setCommitIndex(1);
791
792         leader = new Leader(leaderActorContext);
793
794         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
795
796         assertEquals(1, appendEntries.getLeaderCommit());
797         assertEquals(0, appendEntries.getEntries().size());
798         assertEquals(0, appendEntries.getPrevLogIndex());
799
800         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
801                 leaderActor, AppendEntriesReply.class);
802
803         assertEquals(2, appendEntriesReply.getLogLastIndex());
804         assertEquals(1, appendEntriesReply.getLogLastTerm());
805
806         // follower returns its next index
807         assertEquals(2, appendEntriesReply.getLogLastIndex());
808         assertEquals(1, appendEntriesReply.getLogLastTerm());
809
810         follower.close();
811     }
812
813     @Test
814     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
815         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
816
817         MockRaftActorContext leaderActorContext = createActorContext();
818
819         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
820
821         Follower follower = new Follower(followerActorContext);
822         followerActor.underlyingActor().setBehavior(follower);
823
824         Map<String, String> peerAddresses = new HashMap<>();
825         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
826
827         leaderActorContext.setPeerAddresses(peerAddresses);
828
829         leaderActorContext.getReplicatedLog().removeFrom(0);
830
831         leaderActorContext.setReplicatedLog(
832                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
833
834         leaderActorContext.setCommitIndex(1);
835
836         followerActorContext.getReplicatedLog().removeFrom(0);
837
838         followerActorContext.setReplicatedLog(
839                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
840
841         // follower has the same log entries but its commit index > leaders commit index
842         followerActorContext.setCommitIndex(2);
843
844         leader = new Leader(leaderActorContext);
845
846         // Initial heartbeat
847         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
848
849         assertEquals(1, appendEntries.getLeaderCommit());
850         assertEquals(0, appendEntries.getEntries().size());
851         assertEquals(0, appendEntries.getPrevLogIndex());
852
853         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
854                 leaderActor, AppendEntriesReply.class);
855
856         assertEquals(2, appendEntriesReply.getLogLastIndex());
857         assertEquals(1, appendEntriesReply.getLogLastTerm());
858
859         leaderActor.underlyingActor().setBehavior(follower);
860         leader.handleMessage(followerActor, appendEntriesReply);
861
862         leaderActor.underlyingActor().clear();
863         followerActor.underlyingActor().clear();
864
865         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
866                 TimeUnit.MILLISECONDS);
867
868         leader.handleMessage(leaderActor, new SendHeartBeat());
869
870         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
871
872         assertEquals(2, appendEntries.getLeaderCommit());
873         assertEquals(0, appendEntries.getEntries().size());
874         assertEquals(2, appendEntries.getPrevLogIndex());
875
876         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
877
878         assertEquals(2, appendEntriesReply.getLogLastIndex());
879         assertEquals(1, appendEntriesReply.getLogLastTerm());
880
881         assertEquals(2, followerActorContext.getCommitIndex());
882
883         follower.close();
884     }
885
886     @Test
887     public void testHandleAppendEntriesReplyFailure(){
888         logStart("testHandleAppendEntriesReplyFailure");
889
890         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
891
892         leader = new Leader(leaderActorContext);
893
894         // Send initial heartbeat reply with last index.
895         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
896
897         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
898         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
899
900         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
901
902         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
903
904         assertEquals(RaftState.Leader, raftActorBehavior.state());
905
906         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
907     }
908
909     @Test
910     public void testHandleAppendEntriesReplySuccess() throws Exception {
911         logStart("testHandleAppendEntriesReplySuccess");
912
913         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
914
915         leaderActorContext.setReplicatedLog(
916                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
917
918         leaderActorContext.setCommitIndex(1);
919         leaderActorContext.setLastApplied(1);
920         leaderActorContext.getTermInformation().update(1, "leader");
921
922         leader = new Leader(leaderActorContext);
923
924         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
925
926         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
927
928         assertEquals(RaftState.Leader, raftActorBehavior.state());
929
930         assertEquals(2, leaderActorContext.getCommitIndex());
931
932         ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
933                 leaderActor, ApplyLogEntries.class);
934
935         assertEquals(2, leaderActorContext.getLastApplied());
936
937         assertEquals(2, applyLogEntries.getToIndex());
938
939         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
940                 ApplyState.class);
941
942         assertEquals(1,applyStateList.size());
943
944         ApplyState applyState = applyStateList.get(0);
945
946         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
947     }
948
949     @Test
950     public void testHandleAppendEntriesReplyUnknownFollower(){
951         logStart("testHandleAppendEntriesReplyUnknownFollower");
952
953         MockRaftActorContext leaderActorContext = createActorContext();
954
955         leader = new Leader(leaderActorContext);
956
957         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
958
959         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
960
961         assertEquals(RaftState.Leader, raftActorBehavior.state());
962     }
963
964     @Test
965     public void testHandleRequestVoteReply(){
966         logStart("testHandleRequestVoteReply");
967
968         MockRaftActorContext leaderActorContext = createActorContext();
969
970         leader = new Leader(leaderActorContext);
971
972         // Should be a no-op.
973         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
974                 new RequestVoteReply(1, true));
975
976         assertEquals(RaftState.Leader, raftActorBehavior.state());
977
978         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
979
980         assertEquals(RaftState.Leader, raftActorBehavior.state());
981     }
982
983     @Test
984     public void testIsolatedLeaderCheckNoFollowers() {
985         logStart("testIsolatedLeaderCheckNoFollowers");
986
987         MockRaftActorContext leaderActorContext = createActorContext();
988
989         leader = new Leader(leaderActorContext);
990         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
991         Assert.assertTrue(behavior instanceof Leader);
992     }
993
994     @Test
995     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
996         logStart("testIsolatedLeaderCheckTwoFollowers");
997
998         new JavaTestKit(getSystem()) {{
999
1000             ActorRef followerActor1 = getTestActor();
1001             ActorRef followerActor2 = getTestActor();
1002
1003             MockRaftActorContext leaderActorContext = createActorContext();
1004
1005             Map<String, String> peerAddresses = new HashMap<>();
1006             peerAddresses.put("follower-1", followerActor1.path().toString());
1007             peerAddresses.put("follower-2", followerActor2.path().toString());
1008
1009             leaderActorContext.setPeerAddresses(peerAddresses);
1010
1011             leader = new Leader(leaderActorContext);
1012
1013             leader.markFollowerActive("follower-1");
1014             leader.markFollowerActive("follower-2");
1015             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1016             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1017                 behavior instanceof Leader);
1018
1019             // kill 1 follower and verify if that got killed
1020             final JavaTestKit probe = new JavaTestKit(getSystem());
1021             probe.watch(followerActor1);
1022             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1023             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1024             assertEquals(termMsg1.getActor(), followerActor1);
1025
1026             leader.markFollowerInActive("follower-1");
1027             leader.markFollowerActive("follower-2");
1028             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1029             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1030                 behavior instanceof Leader);
1031
1032             // kill 2nd follower and leader should change to Isolated leader
1033             followerActor2.tell(PoisonPill.getInstance(), null);
1034             probe.watch(followerActor2);
1035             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1036             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1037             assertEquals(termMsg2.getActor(), followerActor2);
1038
1039             leader.markFollowerInActive("follower-2");
1040             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1041             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1042                 behavior instanceof IsolatedLeader);
1043         }};
1044     }
1045
1046
1047     @Test
1048     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1049         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1050
1051         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1052
1053         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1054         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1055         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1056
1057         leaderActorContext.setConfigParams(configParams);
1058
1059         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1060
1061         followerActorContext.setConfigParams(configParams);
1062
1063         Follower follower = new Follower(followerActorContext);
1064         followerActor.underlyingActor().setBehavior(follower);
1065
1066         leaderActorContext.getReplicatedLog().removeFrom(0);
1067         leaderActorContext.setCommitIndex(-1);
1068         leaderActorContext.setLastApplied(-1);
1069
1070         followerActorContext.getReplicatedLog().removeFrom(0);
1071         followerActorContext.setCommitIndex(-1);
1072         followerActorContext.setLastApplied(-1);
1073
1074         leader = new Leader(leaderActorContext);
1075
1076         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1077                 leaderActor, AppendEntriesReply.class);
1078
1079         leader.handleMessage(followerActor, appendEntriesReply);
1080
1081         // Clear initial heartbeat messages
1082
1083         leaderActor.underlyingActor().clear();
1084         followerActor.underlyingActor().clear();
1085
1086         // create 3 entries
1087         leaderActorContext.setReplicatedLog(
1088                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1089         leaderActorContext.setCommitIndex(1);
1090         leaderActorContext.setLastApplied(1);
1091
1092         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1093                 TimeUnit.MILLISECONDS);
1094
1095         leader.handleMessage(leaderActor, new SendHeartBeat());
1096
1097         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1098
1099         // Should send first log entry
1100         assertEquals(1, appendEntries.getLeaderCommit());
1101         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1102         assertEquals(-1, appendEntries.getPrevLogIndex());
1103
1104         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1105
1106         assertEquals(1, appendEntriesReply.getLogLastTerm());
1107         assertEquals(0, appendEntriesReply.getLogLastIndex());
1108
1109         followerActor.underlyingActor().clear();
1110
1111         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1112
1113         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1114
1115         // Should send second log entry
1116         assertEquals(1, appendEntries.getLeaderCommit());
1117         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1118
1119         follower.close();
1120     }
1121
1122     @Test
1123     public void testLaggingFollowerStarvation() throws Exception {
1124         logStart("testLaggingFollowerStarvation");
1125         new JavaTestKit(getSystem()) {{
1126             String leaderActorId = actorFactory.generateActorId("leader");
1127             String follower1ActorId = actorFactory.generateActorId("follower");
1128             String follower2ActorId = actorFactory.generateActorId("follower");
1129
1130             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1131                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1132             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1133             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1134
1135             MockRaftActorContext leaderActorContext =
1136                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1137
1138             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1139             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1140             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1141
1142             leaderActorContext.setConfigParams(configParams);
1143
1144             leaderActorContext.setReplicatedLog(
1145                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1146
1147             Map<String, String> peerAddresses = new HashMap<>();
1148             peerAddresses.put(follower1ActorId,
1149                     follower1Actor.path().toString());
1150             peerAddresses.put(follower2ActorId,
1151                     follower2Actor.path().toString());
1152
1153             leaderActorContext.setPeerAddresses(peerAddresses);
1154             leaderActorContext.getTermInformation().update(1, leaderActorId);
1155
1156             RaftActorBehavior leader = createBehavior(leaderActorContext);
1157
1158             leaderActor.underlyingActor().setBehavior(leader);
1159
1160             for(int i=1;i<6;i++) {
1161                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1162                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1163                 assertTrue(newBehavior == leader);
1164                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1165             }
1166
1167             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1168             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1169
1170             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1171                     heartbeats.size() > 1);
1172
1173             // Check if follower-2 got AppendEntries during this time and was not starved
1174             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1175
1176             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1177                     appendEntries.size() > 1);
1178
1179         }};
1180     }
1181
1182     @Override
1183     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1184             ActorRef actorRef, RaftRPC rpc) throws Exception {
1185         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1186         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1187     }
1188
1189     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1190
1191         private final long electionTimeOutIntervalMillis;
1192         private final int snapshotChunkSize;
1193
1194         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1195             super();
1196             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1197             this.snapshotChunkSize = snapshotChunkSize;
1198         }
1199
1200         @Override
1201         public FiniteDuration getElectionTimeOutInterval() {
1202             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1203         }
1204
1205         @Override
1206         public int getSnapshotChunkSize() {
1207             return snapshotChunkSize;
1208         }
1209     }
1210 }