02c391f146630277c9fc60233193bb8108cd6961
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.TestActorFactory;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
53
54     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56
57     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59
60     private Leader leader;
61
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         actorFactory.close();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123     @Test
124     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
125         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
126
127         MockRaftActorContext actorContext = createActorContextWithFollower();
128
129         long term = 1;
130         actorContext.getTermInformation().update(term, "");
131
132         leader = new Leader(actorContext);
133
134         // Leader will send an immediate heartbeat - ignore it.
135         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
136
137         // The follower would normally reply - simulate that explicitly here.
138         long lastIndex = actorContext.getReplicatedLog().lastIndex();
139         leader.handleMessage(followerActor, new AppendEntriesReply(
140                 FOLLOWER_ID, term, true, lastIndex, term));
141         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142
143         followerActor.underlyingActor().clear();
144
145         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
146         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
147                 1, lastIndex + 1, payload);
148         actorContext.getReplicatedLog().append(newEntry);
149         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
150                 new Replicate(null, null, newEntry));
151
152         // State should not change
153         assertTrue(raftBehavior instanceof Leader);
154
155         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
156         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
157         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
158         assertEquals("Entries size", 1, appendEntries.getEntries().size());
159         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
160         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
161         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
162     }
163
164     @Test
165     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
166         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
167
168         MockRaftActorContext actorContext = createActorContext();
169
170         leader = new Leader(actorContext);
171
172         actorContext.setLastApplied(0);
173
174         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
175         long term = actorContext.getTermInformation().getCurrentTerm();
176         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
177                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
178
179         actorContext.getReplicatedLog().append(newEntry);
180
181         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
182                 new Replicate(leaderActor, "state-id", newEntry));
183
184         // State should not change
185         assertTrue(raftBehavior instanceof Leader);
186
187         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
188
189         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
190         // one since lastApplied state is 0.
191         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
192                 leaderActor, ApplyState.class);
193         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
194
195         for(int i = 0; i <= newLogIndex - 1; i++ ) {
196             ApplyState applyState = applyStateList.get(i);
197             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
198             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
199         }
200
201         ApplyState last = applyStateList.get((int) newLogIndex - 1);
202         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
203         assertEquals("getIdentifier", "state-id", last.getIdentifier());
204     }
205
206     @Test
207     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
208         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
209
210         MockRaftActorContext actorContext = createActorContextWithFollower();
211
212         Map<String, String> leadersSnapshot = new HashMap<>();
213         leadersSnapshot.put("1", "A");
214         leadersSnapshot.put("2", "B");
215         leadersSnapshot.put("3", "C");
216
217         //clears leaders log
218         actorContext.getReplicatedLog().removeFrom(0);
219
220         final int followersLastIndex = 2;
221         final int snapshotIndex = 3;
222         final int newEntryIndex = 4;
223         final int snapshotTerm = 1;
224         final int currentTerm = 2;
225
226         // set the snapshot variables in replicatedlog
227         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
228         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
229         actorContext.setCommitIndex(followersLastIndex);
230         //set follower timeout to 2 mins, helps during debugging
231         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
232
233         leader = new Leader(actorContext);
234
235         // new entry
236         ReplicatedLogImplEntry entry =
237                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
238                         new MockRaftActorContext.MockPayload("D"));
239
240         //update follower timestamp
241         leader.markFollowerActive(FOLLOWER_ID);
242
243         ByteString bs = toByteString(leadersSnapshot);
244         leader.setSnapshot(Optional.of(bs));
245         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
246         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
247
248         //send first chunk and no InstallSnapshotReply received yet
249         fts.getNextChunk();
250         fts.incrementChunkIndex();
251
252         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
253                 TimeUnit.MILLISECONDS);
254
255         leader.handleMessage(leaderActor, new SendHeartBeat());
256
257         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
258
259         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
260
261         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
262
263         //InstallSnapshotReply received
264         fts.markSendStatus(true);
265
266         leader.handleMessage(leaderActor, new SendHeartBeat());
267
268         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
269                 InstallSnapshot.SERIALIZABLE_CLASS);
270
271         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
272
273         assertEquals(snapshotIndex, is.getLastIncludedIndex());
274     }
275
276     @Test
277     public void testSendAppendEntriesSnapshotScenario() throws Exception {
278         logStart("testSendAppendEntriesSnapshotScenario");
279
280         MockRaftActorContext actorContext = createActorContextWithFollower();
281
282         Map<String, String> leadersSnapshot = new HashMap<>();
283         leadersSnapshot.put("1", "A");
284         leadersSnapshot.put("2", "B");
285         leadersSnapshot.put("3", "C");
286
287         //clears leaders log
288         actorContext.getReplicatedLog().removeFrom(0);
289
290         final int followersLastIndex = 2;
291         final int snapshotIndex = 3;
292         final int newEntryIndex = 4;
293         final int snapshotTerm = 1;
294         final int currentTerm = 2;
295
296         // set the snapshot variables in replicatedlog
297         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
298         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
299         actorContext.setCommitIndex(followersLastIndex);
300
301         leader = new Leader(actorContext);
302
303         // Leader will send an immediate heartbeat - ignore it.
304         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
305
306         // new entry
307         ReplicatedLogImplEntry entry =
308                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
309                         new MockRaftActorContext.MockPayload("D"));
310
311         //update follower timestamp
312         leader.markFollowerActive(FOLLOWER_ID);
313
314         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
315         RaftActorBehavior raftBehavior = leader.handleMessage(
316                 leaderActor, new Replicate(null, "state-id", entry));
317
318         assertTrue(raftBehavior instanceof Leader);
319
320         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
321     }
322
323     @Test
324     public void testInitiateInstallSnapshot() throws Exception {
325         logStart("testInitiateInstallSnapshot");
326
327         MockRaftActorContext actorContext = createActorContextWithFollower();
328
329         Map<String, String> leadersSnapshot = new HashMap<>();
330         leadersSnapshot.put("1", "A");
331         leadersSnapshot.put("2", "B");
332         leadersSnapshot.put("3", "C");
333
334         //clears leaders log
335         actorContext.getReplicatedLog().removeFrom(0);
336
337         final int followersLastIndex = 2;
338         final int snapshotIndex = 3;
339         final int newEntryIndex = 4;
340         final int snapshotTerm = 1;
341         final int currentTerm = 2;
342
343         // set the snapshot variables in replicatedlog
344         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
345         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
346         actorContext.setLastApplied(3);
347         actorContext.setCommitIndex(followersLastIndex);
348
349         leader = new Leader(actorContext);
350
351         // Leader will send an immediate heartbeat - ignore it.
352         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
353
354         // set the snapshot as absent and check if capture-snapshot is invoked.
355         leader.setSnapshot(Optional.<ByteString>absent());
356
357         // new entry
358         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
359                 new MockRaftActorContext.MockPayload("D"));
360
361         actorContext.getReplicatedLog().append(entry);
362
363         //update follower timestamp
364         leader.markFollowerActive(FOLLOWER_ID);
365
366         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
367
368         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
369
370         assertTrue(cs.isInstallSnapshotInitiated());
371         assertEquals(3, cs.getLastAppliedIndex());
372         assertEquals(1, cs.getLastAppliedTerm());
373         assertEquals(4, cs.getLastIndex());
374         assertEquals(2, cs.getLastTerm());
375
376         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
377         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
378
379         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
380         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
381     }
382
383     @Test
384     public void testInstallSnapshot() throws Exception {
385         logStart("testInstallSnapshot");
386
387         MockRaftActorContext actorContext = createActorContextWithFollower();
388
389         Map<String, String> leadersSnapshot = new HashMap<>();
390         leadersSnapshot.put("1", "A");
391         leadersSnapshot.put("2", "B");
392         leadersSnapshot.put("3", "C");
393
394         //clears leaders log
395         actorContext.getReplicatedLog().removeFrom(0);
396
397         final int followersLastIndex = 2;
398         final int snapshotIndex = 3;
399         final int snapshotTerm = 1;
400         final int currentTerm = 2;
401
402         // set the snapshot variables in replicatedlog
403         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
404         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
405         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
406         actorContext.setCommitIndex(followersLastIndex);
407
408         leader = new Leader(actorContext);
409
410         // Ignore initial heartbeat.
411         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
412
413         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
414                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
415
416         assertTrue(raftBehavior instanceof Leader);
417
418         // check if installsnapshot gets called with the correct values.
419
420         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
421                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
422
423         assertNotNull(installSnapshot.getData());
424         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
425         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
426
427         assertEquals(currentTerm, installSnapshot.getTerm());
428     }
429
430     @Test
431     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
432         logStart("testHandleInstallSnapshotReplyLastChunk");
433
434         MockRaftActorContext actorContext = createActorContextWithFollower();
435
436         final int followersLastIndex = 2;
437         final int snapshotIndex = 3;
438         final int snapshotTerm = 1;
439         final int currentTerm = 2;
440
441         actorContext.setCommitIndex(followersLastIndex);
442
443         leader = new Leader(actorContext);
444
445         // Ignore initial heartbeat.
446         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
447
448         Map<String, String> leadersSnapshot = new HashMap<>();
449         leadersSnapshot.put("1", "A");
450         leadersSnapshot.put("2", "B");
451         leadersSnapshot.put("3", "C");
452
453         // set the snapshot variables in replicatedlog
454
455         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
456         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
457         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
458
459         ByteString bs = toByteString(leadersSnapshot);
460         leader.setSnapshot(Optional.of(bs));
461         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
462         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
463         while(!fts.isLastChunk(fts.getChunkIndex())) {
464             fts.getNextChunk();
465             fts.incrementChunkIndex();
466         }
467
468         //clears leaders log
469         actorContext.getReplicatedLog().removeFrom(0);
470
471         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
472                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
473
474         assertTrue(raftBehavior instanceof Leader);
475
476         assertEquals(0, leader.followerSnapshotSize());
477         assertEquals(1, leader.followerLogSize());
478         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
479         assertNotNull(fli);
480         assertEquals(snapshotIndex, fli.getMatchIndex());
481         assertEquals(snapshotIndex, fli.getMatchIndex());
482         assertEquals(snapshotIndex + 1, fli.getNextIndex());
483     }
484
485     @Test
486     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
487         logStart("testSendSnapshotfromInstallSnapshotReply");
488
489         MockRaftActorContext actorContext = createActorContextWithFollower();
490
491         final int followersLastIndex = 2;
492         final int snapshotIndex = 3;
493         final int snapshotTerm = 1;
494         final int currentTerm = 2;
495
496         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
497             @Override
498             public int getSnapshotChunkSize() {
499                 return 50;
500             }
501         };
502         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
503         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
504
505         actorContext.setConfigParams(configParams);
506         actorContext.setCommitIndex(followersLastIndex);
507
508         leader = new Leader(actorContext);
509
510         Map<String, String> leadersSnapshot = new HashMap<>();
511         leadersSnapshot.put("1", "A");
512         leadersSnapshot.put("2", "B");
513         leadersSnapshot.put("3", "C");
514
515         // set the snapshot variables in replicatedlog
516         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
517         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
518         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
519
520         ByteString bs = toByteString(leadersSnapshot);
521         leader.setSnapshot(Optional.of(bs));
522
523         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
524
525         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
526                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
527
528         assertEquals(1, installSnapshot.getChunkIndex());
529         assertEquals(3, installSnapshot.getTotalChunks());
530
531         followerActor.underlyingActor().clear();
532         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
533                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
534
535         installSnapshot = MessageCollectorActor.expectFirstMatching(
536                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
537
538         assertEquals(2, installSnapshot.getChunkIndex());
539         assertEquals(3, installSnapshot.getTotalChunks());
540
541         followerActor.underlyingActor().clear();
542         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
543                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
544
545         installSnapshot = MessageCollectorActor.expectFirstMatching(
546                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
547
548         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
549         followerActor.underlyingActor().clear();
550         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
551                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
552
553         installSnapshot = MessageCollectorActor.getFirstMatching(
554                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
555
556         Assert.assertNull(installSnapshot);
557     }
558
559
560     @Test
561     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
562         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
563
564         MockRaftActorContext actorContext = createActorContextWithFollower();
565
566         final int followersLastIndex = 2;
567         final int snapshotIndex = 3;
568         final int snapshotTerm = 1;
569         final int currentTerm = 2;
570
571         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
572             @Override
573             public int getSnapshotChunkSize() {
574                 return 50;
575             }
576         });
577
578         actorContext.setCommitIndex(followersLastIndex);
579
580         leader = new Leader(actorContext);
581
582         Map<String, String> leadersSnapshot = new HashMap<>();
583         leadersSnapshot.put("1", "A");
584         leadersSnapshot.put("2", "B");
585         leadersSnapshot.put("3", "C");
586
587         // set the snapshot variables in replicatedlog
588         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
589         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
590         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
591
592         ByteString bs = toByteString(leadersSnapshot);
593         leader.setSnapshot(Optional.of(bs));
594
595         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
596
597         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
598                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
599
600         assertEquals(1, installSnapshot.getChunkIndex());
601         assertEquals(3, installSnapshot.getTotalChunks());
602
603         followerActor.underlyingActor().clear();
604
605         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
606                 FOLLOWER_ID, -1, false));
607
608         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
609                 TimeUnit.MILLISECONDS);
610
611         leader.handleMessage(leaderActor, new SendHeartBeat());
612
613         installSnapshot = MessageCollectorActor.expectFirstMatching(
614                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
615
616         assertEquals(1, installSnapshot.getChunkIndex());
617         assertEquals(3, installSnapshot.getTotalChunks());
618     }
619
620     @Test
621     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
622         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
623
624         MockRaftActorContext actorContext = createActorContextWithFollower();
625
626         final int followersLastIndex = 2;
627         final int snapshotIndex = 3;
628         final int snapshotTerm = 1;
629         final int currentTerm = 2;
630
631         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
632             @Override
633             public int getSnapshotChunkSize() {
634                 return 50;
635             }
636         });
637
638         actorContext.setCommitIndex(followersLastIndex);
639
640         leader = new Leader(actorContext);
641
642         Map<String, String> leadersSnapshot = new HashMap<>();
643         leadersSnapshot.put("1", "A");
644         leadersSnapshot.put("2", "B");
645         leadersSnapshot.put("3", "C");
646
647         // set the snapshot variables in replicatedlog
648         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
649         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
650         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
651
652         ByteString bs = toByteString(leadersSnapshot);
653         leader.setSnapshot(Optional.of(bs));
654
655         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
656
657         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
658                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
659
660         assertEquals(1, installSnapshot.getChunkIndex());
661         assertEquals(3, installSnapshot.getTotalChunks());
662         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
663
664         int hashCode = installSnapshot.getData().hashCode();
665
666         followerActor.underlyingActor().clear();
667
668         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
669                 FOLLOWER_ID, 1, true));
670
671         installSnapshot = MessageCollectorActor.expectFirstMatching(
672                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
673
674         assertEquals(2, installSnapshot.getChunkIndex());
675         assertEquals(3, installSnapshot.getTotalChunks());
676         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
677     }
678
679     @Test
680     public void testFollowerToSnapshotLogic() {
681         logStart("testFollowerToSnapshotLogic");
682
683         MockRaftActorContext actorContext = createActorContext();
684
685         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
686             @Override
687             public int getSnapshotChunkSize() {
688                 return 50;
689             }
690         });
691
692         leader = new Leader(actorContext);
693
694         Map<String, String> leadersSnapshot = new HashMap<>();
695         leadersSnapshot.put("1", "A");
696         leadersSnapshot.put("2", "B");
697         leadersSnapshot.put("3", "C");
698
699         ByteString bs = toByteString(leadersSnapshot);
700         byte[] barray = bs.toByteArray();
701
702         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
703         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
704
705         assertEquals(bs.size(), barray.length);
706
707         int chunkIndex=0;
708         for (int i=0; i < barray.length; i = i + 50) {
709             int j = i + 50;
710             chunkIndex++;
711
712             if (i + 50 > barray.length) {
713                 j = barray.length;
714             }
715
716             ByteString chunk = fts.getNextChunk();
717             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
718             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
719
720             fts.markSendStatus(true);
721             if (!fts.isLastChunk(chunkIndex)) {
722                 fts.incrementChunkIndex();
723             }
724         }
725
726         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
727     }
728
729     @Override protected RaftActorBehavior createBehavior(
730         RaftActorContext actorContext) {
731         return new Leader(actorContext);
732     }
733
734     @Override
735     protected MockRaftActorContext createActorContext() {
736         return createActorContext(leaderActor);
737     }
738
739     @Override
740     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
741         return createActorContext("leader", actorRef);
742     }
743
744     private MockRaftActorContext createActorContextWithFollower() {
745         MockRaftActorContext actorContext = createActorContext();
746         actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
747                 followerActor.path().toString()).build());
748         return actorContext;
749     }
750
751     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
752         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
753         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
754         configParams.setElectionTimeoutFactor(100000);
755         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
756         context.setConfigParams(configParams);
757         return context;
758     }
759
760     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
761         AbstractRaftActorBehavior behavior;
762
763         @Override public void onReceive(Object message) throws Exception {
764             if(behavior != null) {
765                 behavior.handleMessage(sender(), message);
766             }
767
768             super.onReceive(message);
769         }
770
771         public static Props props() {
772             return Props.create(ForwardMessageToBehaviorActor.class);
773         }
774     }
775
776     @Test
777     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
778         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
779
780         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
781
782         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
783
784         Follower follower = new Follower(followerActorContext);
785         followerActor.underlyingActor().behavior = follower;
786
787         Map<String, String> peerAddresses = new HashMap<>();
788         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
789
790         leaderActorContext.setPeerAddresses(peerAddresses);
791
792         leaderActorContext.getReplicatedLog().removeFrom(0);
793
794         //create 3 entries
795         leaderActorContext.setReplicatedLog(
796                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
797
798         leaderActorContext.setCommitIndex(1);
799
800         followerActorContext.getReplicatedLog().removeFrom(0);
801
802         // follower too has the exact same log entries and has the same commit index
803         followerActorContext.setReplicatedLog(
804                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
805
806         followerActorContext.setCommitIndex(1);
807
808         leader = new Leader(leaderActorContext);
809
810         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811
812         assertEquals(1, appendEntries.getLeaderCommit());
813         assertEquals(0, appendEntries.getEntries().size());
814         assertEquals(0, appendEntries.getPrevLogIndex());
815
816         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
817                 leaderActor, AppendEntriesReply.class);
818
819         assertEquals(2, appendEntriesReply.getLogLastIndex());
820         assertEquals(1, appendEntriesReply.getLogLastTerm());
821
822         // follower returns its next index
823         assertEquals(2, appendEntriesReply.getLogLastIndex());
824         assertEquals(1, appendEntriesReply.getLogLastTerm());
825
826         follower.close();
827     }
828
829     @Test
830     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
831         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
832
833         MockRaftActorContext leaderActorContext = createActorContext();
834
835         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
836
837         Follower follower = new Follower(followerActorContext);
838         followerActor.underlyingActor().behavior = follower;
839
840         Map<String, String> peerAddresses = new HashMap<>();
841         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
842
843         leaderActorContext.setPeerAddresses(peerAddresses);
844
845         leaderActorContext.getReplicatedLog().removeFrom(0);
846
847         leaderActorContext.setReplicatedLog(
848                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
849
850         leaderActorContext.setCommitIndex(1);
851
852         followerActorContext.getReplicatedLog().removeFrom(0);
853
854         followerActorContext.setReplicatedLog(
855                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
856
857         // follower has the same log entries but its commit index > leaders commit index
858         followerActorContext.setCommitIndex(2);
859
860         leader = new Leader(leaderActorContext);
861
862         // Initial heartbeat
863         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864
865         assertEquals(1, appendEntries.getLeaderCommit());
866         assertEquals(0, appendEntries.getEntries().size());
867         assertEquals(0, appendEntries.getPrevLogIndex());
868
869         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
870                 leaderActor, AppendEntriesReply.class);
871
872         assertEquals(2, appendEntriesReply.getLogLastIndex());
873         assertEquals(1, appendEntriesReply.getLogLastTerm());
874
875         leaderActor.underlyingActor().behavior = leader;
876         leader.handleMessage(followerActor, appendEntriesReply);
877
878         leaderActor.underlyingActor().clear();
879         followerActor.underlyingActor().clear();
880
881         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
882                 TimeUnit.MILLISECONDS);
883
884         leader.handleMessage(leaderActor, new SendHeartBeat());
885
886         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
887
888         assertEquals(2, appendEntries.getLeaderCommit());
889         assertEquals(0, appendEntries.getEntries().size());
890         assertEquals(2, appendEntries.getPrevLogIndex());
891
892         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
893
894         assertEquals(2, appendEntriesReply.getLogLastIndex());
895         assertEquals(1, appendEntriesReply.getLogLastTerm());
896
897         assertEquals(2, followerActorContext.getCommitIndex());
898
899         follower.close();
900     }
901
902     @Test
903     public void testHandleAppendEntriesReplyFailure(){
904         logStart("testHandleAppendEntriesReplyFailure");
905
906         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
907
908         leader = new Leader(leaderActorContext);
909
910         // Send initial heartbeat reply with last index.
911         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
912
913         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
914         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
915
916         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
917
918         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
919
920         assertEquals(RaftState.Leader, raftActorBehavior.state());
921
922         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
923     }
924
925     @Test
926     public void testHandleAppendEntriesReplySuccess() throws Exception {
927         logStart("testHandleAppendEntriesReplySuccess");
928
929         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
930
931         leaderActorContext.setReplicatedLog(
932                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
933
934         leaderActorContext.setCommitIndex(1);
935         leaderActorContext.setLastApplied(1);
936         leaderActorContext.getTermInformation().update(1, "leader");
937
938         leader = new Leader(leaderActorContext);
939
940         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
941
942         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
943
944         assertEquals(RaftState.Leader, raftActorBehavior.state());
945
946         assertEquals(2, leaderActorContext.getCommitIndex());
947
948         ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
949                 leaderActor, ApplyLogEntries.class);
950
951         assertEquals(2, leaderActorContext.getLastApplied());
952
953         assertEquals(2, applyLogEntries.getToIndex());
954
955         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
956                 ApplyState.class);
957
958         assertEquals(1,applyStateList.size());
959
960         ApplyState applyState = applyStateList.get(0);
961
962         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
963     }
964
965     @Test
966     public void testHandleAppendEntriesReplyUnknownFollower(){
967         logStart("testHandleAppendEntriesReplyUnknownFollower");
968
969         MockRaftActorContext leaderActorContext = createActorContext();
970
971         leader = new Leader(leaderActorContext);
972
973         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
974
975         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
976
977         assertEquals(RaftState.Leader, raftActorBehavior.state());
978     }
979
980     @Test
981     public void testHandleRequestVoteReply(){
982         logStart("testHandleRequestVoteReply");
983
984         MockRaftActorContext leaderActorContext = createActorContext();
985
986         leader = new Leader(leaderActorContext);
987
988         // Should be a no-op.
989         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
990                 new RequestVoteReply(1, true));
991
992         assertEquals(RaftState.Leader, raftActorBehavior.state());
993
994         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
995
996         assertEquals(RaftState.Leader, raftActorBehavior.state());
997     }
998
999     @Test
1000     public void testIsolatedLeaderCheckNoFollowers() {
1001         logStart("testIsolatedLeaderCheckNoFollowers");
1002
1003         MockRaftActorContext leaderActorContext = createActorContext();
1004
1005         leader = new Leader(leaderActorContext);
1006         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1007         Assert.assertTrue(behavior instanceof Leader);
1008     }
1009
1010     @Test
1011     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1012         logStart("testIsolatedLeaderCheckTwoFollowers");
1013
1014         new JavaTestKit(getSystem()) {{
1015
1016             ActorRef followerActor1 = getTestActor();
1017             ActorRef followerActor2 = getTestActor();
1018
1019             MockRaftActorContext leaderActorContext = createActorContext();
1020
1021             Map<String, String> peerAddresses = new HashMap<>();
1022             peerAddresses.put("follower-1", followerActor1.path().toString());
1023             peerAddresses.put("follower-2", followerActor2.path().toString());
1024
1025             leaderActorContext.setPeerAddresses(peerAddresses);
1026
1027             leader = new Leader(leaderActorContext);
1028             leader.stopIsolatedLeaderCheckSchedule();
1029
1030             leader.markFollowerActive("follower-1");
1031             leader.markFollowerActive("follower-2");
1032             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1033             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1034                 behavior instanceof Leader);
1035
1036             // kill 1 follower and verify if that got killed
1037             final JavaTestKit probe = new JavaTestKit(getSystem());
1038             probe.watch(followerActor1);
1039             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1040             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1041             assertEquals(termMsg1.getActor(), followerActor1);
1042
1043             leader.markFollowerInActive("follower-1");
1044             leader.markFollowerActive("follower-2");
1045             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1046             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1047                 behavior instanceof Leader);
1048
1049             // kill 2nd follower and leader should change to Isolated leader
1050             followerActor2.tell(PoisonPill.getInstance(), null);
1051             probe.watch(followerActor2);
1052             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1053             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1054             assertEquals(termMsg2.getActor(), followerActor2);
1055
1056             leader.markFollowerInActive("follower-2");
1057             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1058             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1059                 behavior instanceof IsolatedLeader);
1060         }};
1061     }
1062
1063
1064     @Test
1065     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1066         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1067
1068         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1069
1070         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1071         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1072         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1073
1074         leaderActorContext.setConfigParams(configParams);
1075
1076         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1077
1078         followerActorContext.setConfigParams(configParams);
1079
1080         Follower follower = new Follower(followerActorContext);
1081         followerActor.underlyingActor().behavior = follower;
1082
1083         leaderActorContext.getReplicatedLog().removeFrom(0);
1084         leaderActorContext.setCommitIndex(-1);
1085         leaderActorContext.setLastApplied(-1);
1086
1087         followerActorContext.getReplicatedLog().removeFrom(0);
1088         followerActorContext.setCommitIndex(-1);
1089         followerActorContext.setLastApplied(-1);
1090
1091         leader = new Leader(leaderActorContext);
1092
1093         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1094                 leaderActor, AppendEntriesReply.class);
1095
1096         leader.handleMessage(followerActor, appendEntriesReply);
1097
1098         // Clear initial heartbeat messages
1099
1100         leaderActor.underlyingActor().clear();
1101         followerActor.underlyingActor().clear();
1102
1103         // create 3 entries
1104         leaderActorContext.setReplicatedLog(
1105                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1106         leaderActorContext.setCommitIndex(1);
1107         leaderActorContext.setLastApplied(1);
1108
1109         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1110                 TimeUnit.MILLISECONDS);
1111
1112         leader.handleMessage(leaderActor, new SendHeartBeat());
1113
1114         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1115
1116         // Should send first log entry
1117         assertEquals(1, appendEntries.getLeaderCommit());
1118         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1119         assertEquals(-1, appendEntries.getPrevLogIndex());
1120
1121         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1122
1123         assertEquals(1, appendEntriesReply.getLogLastTerm());
1124         assertEquals(0, appendEntriesReply.getLogLastIndex());
1125
1126         followerActor.underlyingActor().clear();
1127
1128         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1129
1130         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1131
1132         // Should send second log entry
1133         assertEquals(1, appendEntries.getLeaderCommit());
1134         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1135
1136         follower.close();
1137     }
1138
1139     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1140
1141         private final long electionTimeOutIntervalMillis;
1142         private final int snapshotChunkSize;
1143
1144         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1145             super();
1146             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1147             this.snapshotChunkSize = snapshotChunkSize;
1148         }
1149
1150         @Override
1151         public FiniteDuration getElectionTimeOutInterval() {
1152             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1153         }
1154
1155         @Override
1156         public int getSnapshotChunkSize() {
1157             return snapshotChunkSize;
1158         }
1159     }
1160 }