Refactor FollowerTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.TestActorFactory;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
53
54     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56
57     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59
60     private Leader leader;
61
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         actorFactory.close();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123     @Test
124     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
125         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
126
127         MockRaftActorContext actorContext = createActorContextWithFollower();
128
129         long term = 1;
130         actorContext.getTermInformation().update(term, "");
131
132         leader = new Leader(actorContext);
133
134         // Leader will send an immediate heartbeat - ignore it.
135         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
136
137         // The follower would normally reply - simulate that explicitly here.
138         long lastIndex = actorContext.getReplicatedLog().lastIndex();
139         leader.handleMessage(followerActor, new AppendEntriesReply(
140                 FOLLOWER_ID, term, true, lastIndex, term));
141         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142
143         followerActor.underlyingActor().clear();
144
145         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
146         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
147                 1, lastIndex + 1, payload);
148         actorContext.getReplicatedLog().append(newEntry);
149         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
150                 new Replicate(null, null, newEntry));
151
152         // State should not change
153         assertTrue(raftBehavior instanceof Leader);
154
155         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
156         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
157         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
158         assertEquals("Entries size", 1, appendEntries.getEntries().size());
159         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
160         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
161         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
162     }
163
164     @Test
165     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
166         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
167
168         MockRaftActorContext actorContext = createActorContext();
169
170         leader = new Leader(actorContext);
171
172         actorContext.setLastApplied(0);
173
174         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
175         long term = actorContext.getTermInformation().getCurrentTerm();
176         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
177                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
178
179         actorContext.getReplicatedLog().append(newEntry);
180
181         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
182                 new Replicate(leaderActor, "state-id", newEntry));
183
184         // State should not change
185         assertTrue(raftBehavior instanceof Leader);
186
187         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
188
189         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
190         // one since lastApplied state is 0.
191         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
192                 leaderActor, ApplyState.class);
193         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
194
195         for(int i = 0; i <= newLogIndex - 1; i++ ) {
196             ApplyState applyState = applyStateList.get(i);
197             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
198             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
199         }
200
201         ApplyState last = applyStateList.get((int) newLogIndex - 1);
202         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
203         assertEquals("getIdentifier", "state-id", last.getIdentifier());
204     }
205
206     @Test
207     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
208         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
209
210         MockRaftActorContext actorContext = createActorContextWithFollower();
211
212         Map<String, String> leadersSnapshot = new HashMap<>();
213         leadersSnapshot.put("1", "A");
214         leadersSnapshot.put("2", "B");
215         leadersSnapshot.put("3", "C");
216
217         //clears leaders log
218         actorContext.getReplicatedLog().removeFrom(0);
219
220         final int followersLastIndex = 2;
221         final int snapshotIndex = 3;
222         final int newEntryIndex = 4;
223         final int snapshotTerm = 1;
224         final int currentTerm = 2;
225
226         // set the snapshot variables in replicatedlog
227         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
228         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
229         actorContext.setCommitIndex(followersLastIndex);
230         //set follower timeout to 2 mins, helps during debugging
231         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
232
233         leader = new Leader(actorContext);
234
235         // new entry
236         ReplicatedLogImplEntry entry =
237                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
238                         new MockRaftActorContext.MockPayload("D"));
239
240         //update follower timestamp
241         leader.markFollowerActive(FOLLOWER_ID);
242
243         ByteString bs = toByteString(leadersSnapshot);
244         leader.setSnapshot(Optional.of(bs));
245         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
246         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
247
248         //send first chunk and no InstallSnapshotReply received yet
249         fts.getNextChunk();
250         fts.incrementChunkIndex();
251
252         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
253                 TimeUnit.MILLISECONDS);
254
255         leader.handleMessage(leaderActor, new SendHeartBeat());
256
257         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
258
259         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
260
261         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
262
263         //InstallSnapshotReply received
264         fts.markSendStatus(true);
265
266         leader.handleMessage(leaderActor, new SendHeartBeat());
267
268         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
269                 InstallSnapshot.SERIALIZABLE_CLASS);
270
271         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
272
273         assertEquals(snapshotIndex, is.getLastIncludedIndex());
274     }
275
276     @Test
277     public void testSendAppendEntriesSnapshotScenario() throws Exception {
278         logStart("testSendAppendEntriesSnapshotScenario");
279
280         MockRaftActorContext actorContext = createActorContextWithFollower();
281
282         Map<String, String> leadersSnapshot = new HashMap<>();
283         leadersSnapshot.put("1", "A");
284         leadersSnapshot.put("2", "B");
285         leadersSnapshot.put("3", "C");
286
287         //clears leaders log
288         actorContext.getReplicatedLog().removeFrom(0);
289
290         final int followersLastIndex = 2;
291         final int snapshotIndex = 3;
292         final int newEntryIndex = 4;
293         final int snapshotTerm = 1;
294         final int currentTerm = 2;
295
296         // set the snapshot variables in replicatedlog
297         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
298         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
299         actorContext.setCommitIndex(followersLastIndex);
300
301         leader = new Leader(actorContext);
302
303         // Leader will send an immediate heartbeat - ignore it.
304         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
305
306         // new entry
307         ReplicatedLogImplEntry entry =
308                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
309                         new MockRaftActorContext.MockPayload("D"));
310
311         //update follower timestamp
312         leader.markFollowerActive(FOLLOWER_ID);
313
314         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
315         RaftActorBehavior raftBehavior = leader.handleMessage(
316                 leaderActor, new Replicate(null, "state-id", entry));
317
318         assertTrue(raftBehavior instanceof Leader);
319
320         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
321     }
322
323     @Test
324     public void testInitiateInstallSnapshot() throws Exception {
325         logStart("testInitiateInstallSnapshot");
326
327         MockRaftActorContext actorContext = createActorContextWithFollower();
328
329         Map<String, String> leadersSnapshot = new HashMap<>();
330         leadersSnapshot.put("1", "A");
331         leadersSnapshot.put("2", "B");
332         leadersSnapshot.put("3", "C");
333
334         //clears leaders log
335         actorContext.getReplicatedLog().removeFrom(0);
336
337         final int followersLastIndex = 2;
338         final int snapshotIndex = 3;
339         final int newEntryIndex = 4;
340         final int snapshotTerm = 1;
341         final int currentTerm = 2;
342
343         // set the snapshot variables in replicatedlog
344         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
345         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
346         actorContext.setLastApplied(3);
347         actorContext.setCommitIndex(followersLastIndex);
348
349         leader = new Leader(actorContext);
350
351         // Leader will send an immediate heartbeat - ignore it.
352         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
353
354         // set the snapshot as absent and check if capture-snapshot is invoked.
355         leader.setSnapshot(Optional.<ByteString>absent());
356
357         // new entry
358         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
359                 new MockRaftActorContext.MockPayload("D"));
360
361         actorContext.getReplicatedLog().append(entry);
362
363         //update follower timestamp
364         leader.markFollowerActive(FOLLOWER_ID);
365
366         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
367
368         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
369
370         assertTrue(cs.isInstallSnapshotInitiated());
371         assertEquals(3, cs.getLastAppliedIndex());
372         assertEquals(1, cs.getLastAppliedTerm());
373         assertEquals(4, cs.getLastIndex());
374         assertEquals(2, cs.getLastTerm());
375
376         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
377         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
378
379         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
380         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
381     }
382
383     @Test
384     public void testInstallSnapshot() throws Exception {
385         logStart("testInstallSnapshot");
386
387         MockRaftActorContext actorContext = createActorContextWithFollower();
388
389         Map<String, String> leadersSnapshot = new HashMap<>();
390         leadersSnapshot.put("1", "A");
391         leadersSnapshot.put("2", "B");
392         leadersSnapshot.put("3", "C");
393
394         //clears leaders log
395         actorContext.getReplicatedLog().removeFrom(0);
396
397         final int followersLastIndex = 2;
398         final int snapshotIndex = 3;
399         final int snapshotTerm = 1;
400         final int currentTerm = 2;
401
402         // set the snapshot variables in replicatedlog
403         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
404         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
405         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
406         actorContext.setCommitIndex(followersLastIndex);
407
408         leader = new Leader(actorContext);
409
410         // Ignore initial heartbeat.
411         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
412
413         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
414                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
415
416         assertTrue(raftBehavior instanceof Leader);
417
418         // check if installsnapshot gets called with the correct values.
419
420         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
421                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
422
423         assertNotNull(installSnapshot.getData());
424         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
425         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
426
427         // FIXME - we don't set the term in the serialized message.
428         //assertEquals(currentTerm, installSnapshot.getTerm());
429     }
430
431     @Test
432     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
433         logStart("testHandleInstallSnapshotReplyLastChunk");
434
435         MockRaftActorContext actorContext = createActorContextWithFollower();
436
437         final int followersLastIndex = 2;
438         final int snapshotIndex = 3;
439         final int snapshotTerm = 1;
440         final int currentTerm = 2;
441
442         actorContext.setCommitIndex(followersLastIndex);
443
444         leader = new Leader(actorContext);
445
446         // Ignore initial heartbeat.
447         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
448
449         Map<String, String> leadersSnapshot = new HashMap<>();
450         leadersSnapshot.put("1", "A");
451         leadersSnapshot.put("2", "B");
452         leadersSnapshot.put("3", "C");
453
454         // set the snapshot variables in replicatedlog
455
456         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
457         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
458         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
459
460         ByteString bs = toByteString(leadersSnapshot);
461         leader.setSnapshot(Optional.of(bs));
462         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
463         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
464         while(!fts.isLastChunk(fts.getChunkIndex())) {
465             fts.getNextChunk();
466             fts.incrementChunkIndex();
467         }
468
469         //clears leaders log
470         actorContext.getReplicatedLog().removeFrom(0);
471
472         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
473                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
474
475         assertTrue(raftBehavior instanceof Leader);
476
477         assertEquals(0, leader.followerSnapshotSize());
478         assertEquals(1, leader.followerLogSize());
479         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
480         assertNotNull(fli);
481         assertEquals(snapshotIndex, fli.getMatchIndex());
482         assertEquals(snapshotIndex, fli.getMatchIndex());
483         assertEquals(snapshotIndex + 1, fli.getNextIndex());
484     }
485
486     @Test
487     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
488         logStart("testSendSnapshotfromInstallSnapshotReply");
489
490         MockRaftActorContext actorContext = createActorContextWithFollower();
491
492         final int followersLastIndex = 2;
493         final int snapshotIndex = 3;
494         final int snapshotTerm = 1;
495         final int currentTerm = 2;
496
497         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
498             @Override
499             public int getSnapshotChunkSize() {
500                 return 50;
501             }
502         };
503         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
504         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
505
506         actorContext.setConfigParams(configParams);
507         actorContext.setCommitIndex(followersLastIndex);
508
509         leader = new Leader(actorContext);
510
511         Map<String, String> leadersSnapshot = new HashMap<>();
512         leadersSnapshot.put("1", "A");
513         leadersSnapshot.put("2", "B");
514         leadersSnapshot.put("3", "C");
515
516         // set the snapshot variables in replicatedlog
517         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
518         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
519         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
520
521         ByteString bs = toByteString(leadersSnapshot);
522         leader.setSnapshot(Optional.of(bs));
523
524         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
525
526         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
527                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
528
529         assertEquals(1, installSnapshot.getChunkIndex());
530         assertEquals(3, installSnapshot.getTotalChunks());
531
532         followerActor.underlyingActor().clear();
533         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
534                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
535
536         installSnapshot = MessageCollectorActor.expectFirstMatching(
537                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
538
539         assertEquals(2, installSnapshot.getChunkIndex());
540         assertEquals(3, installSnapshot.getTotalChunks());
541
542         followerActor.underlyingActor().clear();
543         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
544                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
545
546         installSnapshot = MessageCollectorActor.expectFirstMatching(
547                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
548
549         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
550         followerActor.underlyingActor().clear();
551         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
552                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
553
554         installSnapshot = MessageCollectorActor.getFirstMatching(
555                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
556
557         Assert.assertNull(installSnapshot);
558     }
559
560
561     @Test
562     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
563         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
564
565         MockRaftActorContext actorContext = createActorContextWithFollower();
566
567         final int followersLastIndex = 2;
568         final int snapshotIndex = 3;
569         final int snapshotTerm = 1;
570         final int currentTerm = 2;
571
572         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
573             @Override
574             public int getSnapshotChunkSize() {
575                 return 50;
576             }
577         });
578
579         actorContext.setCommitIndex(followersLastIndex);
580
581         leader = new Leader(actorContext);
582
583         Map<String, String> leadersSnapshot = new HashMap<>();
584         leadersSnapshot.put("1", "A");
585         leadersSnapshot.put("2", "B");
586         leadersSnapshot.put("3", "C");
587
588         // set the snapshot variables in replicatedlog
589         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
590         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
591         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
592
593         ByteString bs = toByteString(leadersSnapshot);
594         leader.setSnapshot(Optional.of(bs));
595
596         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
597
598         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
599                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
600
601         assertEquals(1, installSnapshot.getChunkIndex());
602         assertEquals(3, installSnapshot.getTotalChunks());
603
604         followerActor.underlyingActor().clear();
605
606         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
607                 FOLLOWER_ID, -1, false));
608
609         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
610                 TimeUnit.MILLISECONDS);
611
612         leader.handleMessage(leaderActor, new SendHeartBeat());
613
614         installSnapshot = MessageCollectorActor.expectFirstMatching(
615                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
616
617         assertEquals(1, installSnapshot.getChunkIndex());
618         assertEquals(3, installSnapshot.getTotalChunks());
619     }
620
621     @Test
622     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
623         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
624
625         MockRaftActorContext actorContext = createActorContextWithFollower();
626
627         final int followersLastIndex = 2;
628         final int snapshotIndex = 3;
629         final int snapshotTerm = 1;
630         final int currentTerm = 2;
631
632         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
633             @Override
634             public int getSnapshotChunkSize() {
635                 return 50;
636             }
637         });
638
639         actorContext.setCommitIndex(followersLastIndex);
640
641         leader = new Leader(actorContext);
642
643         Map<String, String> leadersSnapshot = new HashMap<>();
644         leadersSnapshot.put("1", "A");
645         leadersSnapshot.put("2", "B");
646         leadersSnapshot.put("3", "C");
647
648         // set the snapshot variables in replicatedlog
649         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
650         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
651         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
652
653         ByteString bs = toByteString(leadersSnapshot);
654         leader.setSnapshot(Optional.of(bs));
655
656         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
657
658         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
659                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
660
661         assertEquals(1, installSnapshot.getChunkIndex());
662         assertEquals(3, installSnapshot.getTotalChunks());
663         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
664
665         int hashCode = installSnapshot.getData().hashCode();
666
667         followerActor.underlyingActor().clear();
668
669         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
670                 FOLLOWER_ID, 1, true));
671
672         installSnapshot = MessageCollectorActor.expectFirstMatching(
673                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
674
675         assertEquals(2, installSnapshot.getChunkIndex());
676         assertEquals(3, installSnapshot.getTotalChunks());
677         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
678     }
679
680     @Test
681     public void testFollowerToSnapshotLogic() {
682         logStart("testFollowerToSnapshotLogic");
683
684         MockRaftActorContext actorContext = createActorContext();
685
686         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
687             @Override
688             public int getSnapshotChunkSize() {
689                 return 50;
690             }
691         });
692
693         leader = new Leader(actorContext);
694
695         Map<String, String> leadersSnapshot = new HashMap<>();
696         leadersSnapshot.put("1", "A");
697         leadersSnapshot.put("2", "B");
698         leadersSnapshot.put("3", "C");
699
700         ByteString bs = toByteString(leadersSnapshot);
701         byte[] barray = bs.toByteArray();
702
703         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
704         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
705
706         assertEquals(bs.size(), barray.length);
707
708         int chunkIndex=0;
709         for (int i=0; i < barray.length; i = i + 50) {
710             int j = i + 50;
711             chunkIndex++;
712
713             if (i + 50 > barray.length) {
714                 j = barray.length;
715             }
716
717             ByteString chunk = fts.getNextChunk();
718             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
719             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
720
721             fts.markSendStatus(true);
722             if (!fts.isLastChunk(chunkIndex)) {
723                 fts.incrementChunkIndex();
724             }
725         }
726
727         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
728     }
729
730     @Override protected RaftActorBehavior createBehavior(
731         RaftActorContext actorContext) {
732         return new Leader(actorContext);
733     }
734
735     @Override
736     protected MockRaftActorContext createActorContext() {
737         return createActorContext(leaderActor);
738     }
739
740     @Override
741     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
742         return createActorContext("leader", actorRef);
743     }
744
745     private MockRaftActorContext createActorContextWithFollower() {
746         MockRaftActorContext actorContext = createActorContext();
747         actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
748                 followerActor.path().toString()).build());
749         return actorContext;
750     }
751
752     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
753         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
754         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
755         configParams.setElectionTimeoutFactor(100000);
756         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
757         context.setConfigParams(configParams);
758         return context;
759     }
760
761     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
762         AbstractRaftActorBehavior behavior;
763
764         @Override public void onReceive(Object message) throws Exception {
765             if(behavior != null) {
766                 behavior.handleMessage(sender(), message);
767             }
768
769             super.onReceive(message);
770         }
771
772         public static Props props() {
773             return Props.create(ForwardMessageToBehaviorActor.class);
774         }
775     }
776
777     @Test
778     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
779         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
780
781         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
782
783         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
784
785         Follower follower = new Follower(followerActorContext);
786         followerActor.underlyingActor().behavior = follower;
787
788         Map<String, String> peerAddresses = new HashMap<>();
789         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
790
791         leaderActorContext.setPeerAddresses(peerAddresses);
792
793         leaderActorContext.getReplicatedLog().removeFrom(0);
794
795         //create 3 entries
796         leaderActorContext.setReplicatedLog(
797                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
798
799         leaderActorContext.setCommitIndex(1);
800
801         followerActorContext.getReplicatedLog().removeFrom(0);
802
803         // follower too has the exact same log entries and has the same commit index
804         followerActorContext.setReplicatedLog(
805                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
806
807         followerActorContext.setCommitIndex(1);
808
809         leader = new Leader(leaderActorContext);
810
811         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812
813         assertEquals(1, appendEntries.getLeaderCommit());
814         assertEquals(0, appendEntries.getEntries().size());
815         assertEquals(0, appendEntries.getPrevLogIndex());
816
817         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
818                 leaderActor, AppendEntriesReply.class);
819
820         assertEquals(2, appendEntriesReply.getLogLastIndex());
821         assertEquals(1, appendEntriesReply.getLogLastTerm());
822
823         // follower returns its next index
824         assertEquals(2, appendEntriesReply.getLogLastIndex());
825         assertEquals(1, appendEntriesReply.getLogLastTerm());
826
827         follower.close();
828     }
829
830     @Test
831     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
832         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
833
834         MockRaftActorContext leaderActorContext = createActorContext();
835
836         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
837
838         Follower follower = new Follower(followerActorContext);
839         followerActor.underlyingActor().behavior = follower;
840
841         Map<String, String> peerAddresses = new HashMap<>();
842         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
843
844         leaderActorContext.setPeerAddresses(peerAddresses);
845
846         leaderActorContext.getReplicatedLog().removeFrom(0);
847
848         leaderActorContext.setReplicatedLog(
849                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
850
851         leaderActorContext.setCommitIndex(1);
852
853         followerActorContext.getReplicatedLog().removeFrom(0);
854
855         followerActorContext.setReplicatedLog(
856                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
857
858         // follower has the same log entries but its commit index > leaders commit index
859         followerActorContext.setCommitIndex(2);
860
861         leader = new Leader(leaderActorContext);
862
863         // Initial heartbeat
864         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
865
866         assertEquals(1, appendEntries.getLeaderCommit());
867         assertEquals(0, appendEntries.getEntries().size());
868         assertEquals(0, appendEntries.getPrevLogIndex());
869
870         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
871                 leaderActor, AppendEntriesReply.class);
872
873         assertEquals(2, appendEntriesReply.getLogLastIndex());
874         assertEquals(1, appendEntriesReply.getLogLastTerm());
875
876         leaderActor.underlyingActor().behavior = leader;
877         leader.handleMessage(followerActor, appendEntriesReply);
878
879         leaderActor.underlyingActor().clear();
880         followerActor.underlyingActor().clear();
881
882         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
883                 TimeUnit.MILLISECONDS);
884
885         leader.handleMessage(leaderActor, new SendHeartBeat());
886
887         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
888
889         assertEquals(2, appendEntries.getLeaderCommit());
890         assertEquals(0, appendEntries.getEntries().size());
891         assertEquals(2, appendEntries.getPrevLogIndex());
892
893         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
894
895         assertEquals(2, appendEntriesReply.getLogLastIndex());
896         assertEquals(1, appendEntriesReply.getLogLastTerm());
897
898         assertEquals(2, followerActorContext.getCommitIndex());
899
900         follower.close();
901     }
902
903     @Test
904     public void testHandleAppendEntriesReplyFailure(){
905         logStart("testHandleAppendEntriesReplyFailure");
906
907         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
908
909         leader = new Leader(leaderActorContext);
910
911         // Send initial heartbeat reply with last index.
912         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
913
914         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
915         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
916
917         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
918
919         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
920
921         assertEquals(RaftState.Leader, raftActorBehavior.state());
922
923         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
924     }
925
926     @Test
927     public void testHandleAppendEntriesReplySuccess() throws Exception {
928         logStart("testHandleAppendEntriesReplySuccess");
929
930         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
931
932         leaderActorContext.setReplicatedLog(
933                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
934
935         leaderActorContext.setCommitIndex(1);
936         leaderActorContext.setLastApplied(1);
937         leaderActorContext.getTermInformation().update(1, "leader");
938
939         leader = new Leader(leaderActorContext);
940
941         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
942
943         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
944
945         assertEquals(RaftState.Leader, raftActorBehavior.state());
946
947         assertEquals(2, leaderActorContext.getCommitIndex());
948
949         ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
950                 leaderActor, ApplyLogEntries.class);
951
952         assertEquals(2, leaderActorContext.getLastApplied());
953
954         assertEquals(2, applyLogEntries.getToIndex());
955
956         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
957                 ApplyState.class);
958
959         assertEquals(1,applyStateList.size());
960
961         ApplyState applyState = applyStateList.get(0);
962
963         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
964     }
965
966     @Test
967     public void testHandleAppendEntriesReplyUnknownFollower(){
968         logStart("testHandleAppendEntriesReplyUnknownFollower");
969
970         MockRaftActorContext leaderActorContext = createActorContext();
971
972         leader = new Leader(leaderActorContext);
973
974         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
975
976         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
977
978         assertEquals(RaftState.Leader, raftActorBehavior.state());
979     }
980
981     @Test
982     public void testHandleRequestVoteReply(){
983         logStart("testHandleRequestVoteReply");
984
985         MockRaftActorContext leaderActorContext = createActorContext();
986
987         leader = new Leader(leaderActorContext);
988
989         // Should be a no-op.
990         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
991                 new RequestVoteReply(1, true));
992
993         assertEquals(RaftState.Leader, raftActorBehavior.state());
994
995         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
996
997         assertEquals(RaftState.Leader, raftActorBehavior.state());
998     }
999
1000     @Test
1001     public void testIsolatedLeaderCheckNoFollowers() {
1002         logStart("testIsolatedLeaderCheckNoFollowers");
1003
1004         MockRaftActorContext leaderActorContext = createActorContext();
1005
1006         leader = new Leader(leaderActorContext);
1007         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1008         Assert.assertTrue(behavior instanceof Leader);
1009     }
1010
1011     @Test
1012     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1013         logStart("testIsolatedLeaderCheckTwoFollowers");
1014
1015         new JavaTestKit(getSystem()) {{
1016
1017             ActorRef followerActor1 = getTestActor();
1018             ActorRef followerActor2 = getTestActor();
1019
1020             MockRaftActorContext leaderActorContext = createActorContext();
1021
1022             Map<String, String> peerAddresses = new HashMap<>();
1023             peerAddresses.put("follower-1", followerActor1.path().toString());
1024             peerAddresses.put("follower-2", followerActor2.path().toString());
1025
1026             leaderActorContext.setPeerAddresses(peerAddresses);
1027
1028             leader = new Leader(leaderActorContext);
1029             leader.stopIsolatedLeaderCheckSchedule();
1030
1031             leader.markFollowerActive("follower-1");
1032             leader.markFollowerActive("follower-2");
1033             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1034             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1035                 behavior instanceof Leader);
1036
1037             // kill 1 follower and verify if that got killed
1038             final JavaTestKit probe = new JavaTestKit(getSystem());
1039             probe.watch(followerActor1);
1040             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1041             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1042             assertEquals(termMsg1.getActor(), followerActor1);
1043
1044             leader.markFollowerInActive("follower-1");
1045             leader.markFollowerActive("follower-2");
1046             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1047             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1048                 behavior instanceof Leader);
1049
1050             // kill 2nd follower and leader should change to Isolated leader
1051             followerActor2.tell(PoisonPill.getInstance(), null);
1052             probe.watch(followerActor2);
1053             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1054             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1055             assertEquals(termMsg2.getActor(), followerActor2);
1056
1057             leader.markFollowerInActive("follower-2");
1058             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1059             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1060                 behavior instanceof IsolatedLeader);
1061         }};
1062     }
1063
1064
1065     @Test
1066     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1067         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1068
1069         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1070
1071         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1072         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1073         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1074
1075         leaderActorContext.setConfigParams(configParams);
1076
1077         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1078
1079         followerActorContext.setConfigParams(configParams);
1080
1081         Follower follower = new Follower(followerActorContext);
1082         followerActor.underlyingActor().behavior = follower;
1083
1084         leaderActorContext.getReplicatedLog().removeFrom(0);
1085         leaderActorContext.setCommitIndex(-1);
1086         leaderActorContext.setLastApplied(-1);
1087
1088         followerActorContext.getReplicatedLog().removeFrom(0);
1089         followerActorContext.setCommitIndex(-1);
1090         followerActorContext.setLastApplied(-1);
1091
1092         leader = new Leader(leaderActorContext);
1093
1094         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1095                 leaderActor, AppendEntriesReply.class);
1096
1097         leader.handleMessage(followerActor, appendEntriesReply);
1098
1099         // Clear initial heartbeat messages
1100
1101         leaderActor.underlyingActor().clear();
1102         followerActor.underlyingActor().clear();
1103
1104         // create 3 entries
1105         leaderActorContext.setReplicatedLog(
1106                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1107         leaderActorContext.setCommitIndex(1);
1108         leaderActorContext.setLastApplied(1);
1109
1110         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1111                 TimeUnit.MILLISECONDS);
1112
1113         leader.handleMessage(leaderActor, new SendHeartBeat());
1114
1115         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1116
1117         // Should send first log entry
1118         assertEquals(1, appendEntries.getLeaderCommit());
1119         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1120         assertEquals(-1, appendEntries.getPrevLogIndex());
1121
1122         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1123
1124         assertEquals(1, appendEntriesReply.getLogLastTerm());
1125         assertEquals(0, appendEntriesReply.getLogLastIndex());
1126
1127         followerActor.underlyingActor().clear();
1128
1129         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1130
1131         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1132
1133         // Should send second log entry
1134         assertEquals(1, appendEntries.getLeaderCommit());
1135         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1136
1137         follower.close();
1138     }
1139
1140     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1141
1142         private final long electionTimeOutIntervalMillis;
1143         private final int snapshotChunkSize;
1144
1145         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1146             super();
1147             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1148             this.snapshotChunkSize = snapshotChunkSize;
1149         }
1150
1151         @Override
1152         public FiniteDuration getElectionTimeOutInterval() {
1153             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1154         }
1155
1156         @Override
1157         public int getSnapshotChunkSize() {
1158             return snapshotChunkSize;
1159         }
1160     }
1161 }