Refactor LeaderTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.After;
24 import org.junit.Assert;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
27 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
29 import org.opendaylight.controller.cluster.raft.RaftActorContext;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
32 import org.opendaylight.controller.cluster.raft.SerializationUtils;
33 import org.opendaylight.controller.cluster.raft.TestActorFactory;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
36 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
38 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
39 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
40 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
49 import org.slf4j.LoggerFactory;
50 import scala.concurrent.duration.FiniteDuration;
51
52 public class LeaderTest extends AbstractRaftActorBehaviorTest {
53
54     static final String FOLLOWER_ID = "follower";
55
56     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
57
58     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
59             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
60
61     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
62             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
63
64     private Leader leader;
65
66     @After
67     public void tearDown() throws Exception {
68         if(leader != null) {
69             leader.close();
70         }
71
72         actorFactory.close();
73     }
74
75     private void logStart(String name) {
76         LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
77     }
78
79     @Test
80     public void testHandleMessageForUnknownMessage() throws Exception {
81         logStart("testHandleMessageForUnknownMessage");
82
83         leader = new Leader(createActorContext());
84
85         // handle message should return the Leader state when it receives an
86         // unknown message
87         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
88         Assert.assertTrue(behavior instanceof Leader);
89     }
90
91     @Test
92     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
93         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
94
95         MockRaftActorContext actorContext = createActorContextWithFollower();
96
97         long term = 1;
98         actorContext.getTermInformation().update(term, "");
99
100         leader = new Leader(actorContext);
101
102         // Leader should send an immediate heartbeat with no entries as follower is inactive.
103         long lastIndex = actorContext.getReplicatedLog().lastIndex();
104         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
105         assertEquals("getTerm", term, appendEntries.getTerm());
106         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
107         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
108         assertEquals("Entries size", 0, appendEntries.getEntries().size());
109
110         // The follower would normally reply - simulate that explicitly here.
111         leader.handleMessage(followerActor, new AppendEntriesReply(
112                 FOLLOWER_ID, term, true, lastIndex - 1, term));
113         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
114
115         followerActor.underlyingActor().clear();
116
117         // Sleep for the heartbeat interval so AppendEntries is sent.
118         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
119                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
120
121         leader.handleMessage(leaderActor, new SendHeartBeat());
122
123         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
124         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
125         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
126         assertEquals("Entries size", 1, appendEntries.getEntries().size());
127         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
128         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
129     }
130
131     @Test
132     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
133         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
134
135         MockRaftActorContext actorContext = createActorContextWithFollower();
136
137         long term = 1;
138         actorContext.getTermInformation().update(term, "");
139
140         leader = new Leader(actorContext);
141
142         // Leader will send an immediate heartbeat - ignore it.
143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
144
145         // The follower would normally reply - simulate that explicitly here.
146         long lastIndex = actorContext.getReplicatedLog().lastIndex();
147         leader.handleMessage(followerActor, new AppendEntriesReply(
148                 FOLLOWER_ID, term, true, lastIndex, term));
149         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
150
151         followerActor.underlyingActor().clear();
152
153         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155                 1, lastIndex + 1, payload);
156         actorContext.getReplicatedLog().append(newEntry);
157         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
158                 new Replicate(null, null, newEntry));
159
160         // State should not change
161         assertTrue(raftBehavior instanceof Leader);
162
163         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166         assertEquals("Entries size", 1, appendEntries.getEntries().size());
167         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
170     }
171
172     @Test
173     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
174         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
175
176         MockRaftActorContext actorContext = createActorContext();
177
178         leader = new Leader(actorContext);
179
180         actorContext.setLastApplied(0);
181
182         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
183         long term = actorContext.getTermInformation().getCurrentTerm();
184         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
185                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
186
187         actorContext.getReplicatedLog().append(newEntry);
188
189         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
190                 new Replicate(leaderActor, "state-id", newEntry));
191
192         // State should not change
193         assertTrue(raftBehavior instanceof Leader);
194
195         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
196
197         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
198         // one since lastApplied state is 0.
199         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
200                 leaderActor, ApplyState.class);
201         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
202
203         for(int i = 0; i <= newLogIndex - 1; i++ ) {
204             ApplyState applyState = applyStateList.get(i);
205             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
206             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
207         }
208
209         ApplyState last = applyStateList.get((int) newLogIndex - 1);
210         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
211         assertEquals("getIdentifier", "state-id", last.getIdentifier());
212     }
213
214     @Test
215     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
216         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
217
218         MockRaftActorContext actorContext = createActorContextWithFollower();
219
220         Map<String, String> leadersSnapshot = new HashMap<>();
221         leadersSnapshot.put("1", "A");
222         leadersSnapshot.put("2", "B");
223         leadersSnapshot.put("3", "C");
224
225         //clears leaders log
226         actorContext.getReplicatedLog().removeFrom(0);
227
228         final int followersLastIndex = 2;
229         final int snapshotIndex = 3;
230         final int newEntryIndex = 4;
231         final int snapshotTerm = 1;
232         final int currentTerm = 2;
233
234         // set the snapshot variables in replicatedlog
235         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
236         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
237         actorContext.setCommitIndex(followersLastIndex);
238         //set follower timeout to 2 mins, helps during debugging
239         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
240
241         leader = new Leader(actorContext);
242
243         // new entry
244         ReplicatedLogImplEntry entry =
245                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
246                         new MockRaftActorContext.MockPayload("D"));
247
248         //update follower timestamp
249         leader.markFollowerActive(FOLLOWER_ID);
250
251         ByteString bs = toByteString(leadersSnapshot);
252         leader.setSnapshot(Optional.of(bs));
253         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
254         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
255
256         //send first chunk and no InstallSnapshotReply received yet
257         fts.getNextChunk();
258         fts.incrementChunkIndex();
259
260         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
261                 TimeUnit.MILLISECONDS);
262
263         leader.handleMessage(leaderActor, new SendHeartBeat());
264
265         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
266
267         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
268
269         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
270
271         //InstallSnapshotReply received
272         fts.markSendStatus(true);
273
274         leader.handleMessage(leaderActor, new SendHeartBeat());
275
276         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
277                 InstallSnapshot.SERIALIZABLE_CLASS);
278
279         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
280
281         assertEquals(snapshotIndex, is.getLastIncludedIndex());
282     }
283
284     @Test
285     public void testSendAppendEntriesSnapshotScenario() throws Exception {
286         logStart("testSendAppendEntriesSnapshotScenario");
287
288         MockRaftActorContext actorContext = createActorContextWithFollower();
289
290         Map<String, String> leadersSnapshot = new HashMap<>();
291         leadersSnapshot.put("1", "A");
292         leadersSnapshot.put("2", "B");
293         leadersSnapshot.put("3", "C");
294
295         //clears leaders log
296         actorContext.getReplicatedLog().removeFrom(0);
297
298         final int followersLastIndex = 2;
299         final int snapshotIndex = 3;
300         final int newEntryIndex = 4;
301         final int snapshotTerm = 1;
302         final int currentTerm = 2;
303
304         // set the snapshot variables in replicatedlog
305         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
306         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
307         actorContext.setCommitIndex(followersLastIndex);
308
309         leader = new Leader(actorContext);
310
311         // Leader will send an immediate heartbeat - ignore it.
312         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
313
314         // new entry
315         ReplicatedLogImplEntry entry =
316                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
317                         new MockRaftActorContext.MockPayload("D"));
318
319         //update follower timestamp
320         leader.markFollowerActive(FOLLOWER_ID);
321
322         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
323         RaftActorBehavior raftBehavior = leader.handleMessage(
324                 leaderActor, new Replicate(null, "state-id", entry));
325
326         assertTrue(raftBehavior instanceof Leader);
327
328         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
329     }
330
331     @Test
332     public void testInitiateInstallSnapshot() throws Exception {
333         logStart("testInitiateInstallSnapshot");
334
335         MockRaftActorContext actorContext = createActorContextWithFollower();
336
337         Map<String, String> leadersSnapshot = new HashMap<>();
338         leadersSnapshot.put("1", "A");
339         leadersSnapshot.put("2", "B");
340         leadersSnapshot.put("3", "C");
341
342         //clears leaders log
343         actorContext.getReplicatedLog().removeFrom(0);
344
345         final int followersLastIndex = 2;
346         final int snapshotIndex = 3;
347         final int newEntryIndex = 4;
348         final int snapshotTerm = 1;
349         final int currentTerm = 2;
350
351         // set the snapshot variables in replicatedlog
352         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
353         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
354         actorContext.setLastApplied(3);
355         actorContext.setCommitIndex(followersLastIndex);
356
357         leader = new Leader(actorContext);
358
359         // Leader will send an immediate heartbeat - ignore it.
360         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
361
362         // set the snapshot as absent and check if capture-snapshot is invoked.
363         leader.setSnapshot(Optional.<ByteString>absent());
364
365         // new entry
366         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
367                 new MockRaftActorContext.MockPayload("D"));
368
369         actorContext.getReplicatedLog().append(entry);
370
371         //update follower timestamp
372         leader.markFollowerActive(FOLLOWER_ID);
373
374         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
375
376         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
377
378         assertTrue(cs.isInstallSnapshotInitiated());
379         assertEquals(3, cs.getLastAppliedIndex());
380         assertEquals(1, cs.getLastAppliedTerm());
381         assertEquals(4, cs.getLastIndex());
382         assertEquals(2, cs.getLastTerm());
383
384         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
385         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
386
387         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
388         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
389     }
390
391     @Test
392     public void testInstallSnapshot() throws Exception {
393         logStart("testInstallSnapshot");
394
395         MockRaftActorContext actorContext = createActorContextWithFollower();
396
397         Map<String, String> leadersSnapshot = new HashMap<>();
398         leadersSnapshot.put("1", "A");
399         leadersSnapshot.put("2", "B");
400         leadersSnapshot.put("3", "C");
401
402         //clears leaders log
403         actorContext.getReplicatedLog().removeFrom(0);
404
405         final int followersLastIndex = 2;
406         final int snapshotIndex = 3;
407         final int snapshotTerm = 1;
408         final int currentTerm = 2;
409
410         // set the snapshot variables in replicatedlog
411         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
414         actorContext.setCommitIndex(followersLastIndex);
415
416         leader = new Leader(actorContext);
417
418         // Ignore initial heartbeat.
419         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
420
421         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
422                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
423
424         assertTrue(raftBehavior instanceof Leader);
425
426         // check if installsnapshot gets called with the correct values.
427
428         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
429                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
430
431         assertNotNull(installSnapshot.getData());
432         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
433         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
434
435         // FIXME - we don't set the term in the serialized message.
436         //assertEquals(currentTerm, installSnapshot.getTerm());
437     }
438
439     @Test
440     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
441         logStart("testHandleInstallSnapshotReplyLastChunk");
442
443         MockRaftActorContext actorContext = createActorContextWithFollower();
444
445         final int followersLastIndex = 2;
446         final int snapshotIndex = 3;
447         final int snapshotTerm = 1;
448         final int currentTerm = 2;
449
450         actorContext.setCommitIndex(followersLastIndex);
451
452         leader = new Leader(actorContext);
453
454         // Ignore initial heartbeat.
455         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
456
457         Map<String, String> leadersSnapshot = new HashMap<>();
458         leadersSnapshot.put("1", "A");
459         leadersSnapshot.put("2", "B");
460         leadersSnapshot.put("3", "C");
461
462         // set the snapshot variables in replicatedlog
463
464         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
465         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
466         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
467
468         ByteString bs = toByteString(leadersSnapshot);
469         leader.setSnapshot(Optional.of(bs));
470         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
471         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
472         while(!fts.isLastChunk(fts.getChunkIndex())) {
473             fts.getNextChunk();
474             fts.incrementChunkIndex();
475         }
476
477         //clears leaders log
478         actorContext.getReplicatedLog().removeFrom(0);
479
480         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
481                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
482
483         assertTrue(raftBehavior instanceof Leader);
484
485         assertEquals(0, leader.followerSnapshotSize());
486         assertEquals(1, leader.followerLogSize());
487         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
488         assertNotNull(fli);
489         assertEquals(snapshotIndex, fli.getMatchIndex());
490         assertEquals(snapshotIndex, fli.getMatchIndex());
491         assertEquals(snapshotIndex + 1, fli.getNextIndex());
492     }
493
494     @Test
495     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
496         logStart("testSendSnapshotfromInstallSnapshotReply");
497
498         MockRaftActorContext actorContext = createActorContextWithFollower();
499
500         final int followersLastIndex = 2;
501         final int snapshotIndex = 3;
502         final int snapshotTerm = 1;
503         final int currentTerm = 2;
504
505         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
506             @Override
507             public int getSnapshotChunkSize() {
508                 return 50;
509             }
510         };
511         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
512         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
513
514         actorContext.setConfigParams(configParams);
515         actorContext.setCommitIndex(followersLastIndex);
516
517         leader = new Leader(actorContext);
518
519         Map<String, String> leadersSnapshot = new HashMap<>();
520         leadersSnapshot.put("1", "A");
521         leadersSnapshot.put("2", "B");
522         leadersSnapshot.put("3", "C");
523
524         // set the snapshot variables in replicatedlog
525         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
526         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
527         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
528
529         ByteString bs = toByteString(leadersSnapshot);
530         leader.setSnapshot(Optional.of(bs));
531
532         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
533
534         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
535                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
536
537         assertEquals(1, installSnapshot.getChunkIndex());
538         assertEquals(3, installSnapshot.getTotalChunks());
539
540         followerActor.underlyingActor().clear();
541         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
542                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
543
544         installSnapshot = MessageCollectorActor.expectFirstMatching(
545                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
546
547         assertEquals(2, installSnapshot.getChunkIndex());
548         assertEquals(3, installSnapshot.getTotalChunks());
549
550         followerActor.underlyingActor().clear();
551         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
552                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
553
554         installSnapshot = MessageCollectorActor.expectFirstMatching(
555                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
556
557         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
558         followerActor.underlyingActor().clear();
559         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
560                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
561
562         installSnapshot = MessageCollectorActor.getFirstMatching(
563                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
564
565         Assert.assertNull(installSnapshot);
566     }
567
568
569     @Test
570     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
571         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
572
573         MockRaftActorContext actorContext = createActorContextWithFollower();
574
575         final int followersLastIndex = 2;
576         final int snapshotIndex = 3;
577         final int snapshotTerm = 1;
578         final int currentTerm = 2;
579
580         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
581             @Override
582             public int getSnapshotChunkSize() {
583                 return 50;
584             }
585         });
586
587         actorContext.setCommitIndex(followersLastIndex);
588
589         leader = new Leader(actorContext);
590
591         Map<String, String> leadersSnapshot = new HashMap<>();
592         leadersSnapshot.put("1", "A");
593         leadersSnapshot.put("2", "B");
594         leadersSnapshot.put("3", "C");
595
596         // set the snapshot variables in replicatedlog
597         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
598         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
599         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
600
601         ByteString bs = toByteString(leadersSnapshot);
602         leader.setSnapshot(Optional.of(bs));
603
604         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
605
606         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
607                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
608
609         assertEquals(1, installSnapshot.getChunkIndex());
610         assertEquals(3, installSnapshot.getTotalChunks());
611
612         followerActor.underlyingActor().clear();
613
614         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
615                 FOLLOWER_ID, -1, false));
616
617         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
618                 TimeUnit.MILLISECONDS);
619
620         leader.handleMessage(leaderActor, new SendHeartBeat());
621
622         installSnapshot = MessageCollectorActor.expectFirstMatching(
623                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
624
625         assertEquals(1, installSnapshot.getChunkIndex());
626         assertEquals(3, installSnapshot.getTotalChunks());
627     }
628
629     @Test
630     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
631         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
632
633         MockRaftActorContext actorContext = createActorContextWithFollower();
634
635         final int followersLastIndex = 2;
636         final int snapshotIndex = 3;
637         final int snapshotTerm = 1;
638         final int currentTerm = 2;
639
640         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
641             @Override
642             public int getSnapshotChunkSize() {
643                 return 50;
644             }
645         });
646
647         actorContext.setCommitIndex(followersLastIndex);
648
649         leader = new Leader(actorContext);
650
651         Map<String, String> leadersSnapshot = new HashMap<>();
652         leadersSnapshot.put("1", "A");
653         leadersSnapshot.put("2", "B");
654         leadersSnapshot.put("3", "C");
655
656         // set the snapshot variables in replicatedlog
657         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
658         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
659         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
660
661         ByteString bs = toByteString(leadersSnapshot);
662         leader.setSnapshot(Optional.of(bs));
663
664         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
665
666         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
667                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
668
669         assertEquals(1, installSnapshot.getChunkIndex());
670         assertEquals(3, installSnapshot.getTotalChunks());
671         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
672
673         int hashCode = installSnapshot.getData().hashCode();
674
675         followerActor.underlyingActor().clear();
676
677         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
678                 FOLLOWER_ID, 1, true));
679
680         installSnapshot = MessageCollectorActor.expectFirstMatching(
681                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
682
683         assertEquals(2, installSnapshot.getChunkIndex());
684         assertEquals(3, installSnapshot.getTotalChunks());
685         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
686     }
687
688     @Test
689     public void testFollowerToSnapshotLogic() {
690         logStart("testFollowerToSnapshotLogic");
691
692         MockRaftActorContext actorContext = createActorContext();
693
694         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
695             @Override
696             public int getSnapshotChunkSize() {
697                 return 50;
698             }
699         });
700
701         leader = new Leader(actorContext);
702
703         Map<String, String> leadersSnapshot = new HashMap<>();
704         leadersSnapshot.put("1", "A");
705         leadersSnapshot.put("2", "B");
706         leadersSnapshot.put("3", "C");
707
708         ByteString bs = toByteString(leadersSnapshot);
709         byte[] barray = bs.toByteArray();
710
711         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
712         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
713
714         assertEquals(bs.size(), barray.length);
715
716         int chunkIndex=0;
717         for (int i=0; i < barray.length; i = i + 50) {
718             int j = i + 50;
719             chunkIndex++;
720
721             if (i + 50 > barray.length) {
722                 j = barray.length;
723             }
724
725             ByteString chunk = fts.getNextChunk();
726             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
727             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
728
729             fts.markSendStatus(true);
730             if (!fts.isLastChunk(chunkIndex)) {
731                 fts.incrementChunkIndex();
732             }
733         }
734
735         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
736     }
737
738     @Override protected RaftActorBehavior createBehavior(
739         RaftActorContext actorContext) {
740         return new Leader(actorContext);
741     }
742
743     @Override
744     protected MockRaftActorContext createActorContext() {
745         return createActorContext(leaderActor);
746     }
747
748     @Override
749     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
750         return createActorContext("leader", actorRef);
751     }
752
753     private MockRaftActorContext createActorContextWithFollower() {
754         MockRaftActorContext actorContext = createActorContext();
755         actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
756                 followerActor.path().toString()).build());
757         return actorContext;
758     }
759
760     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
761         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
762         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
763         configParams.setElectionTimeoutFactor(100000);
764         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
765         context.setConfigParams(configParams);
766         return context;
767     }
768
769     private ByteString toByteString(Map<String, String> state) {
770         ByteArrayOutputStream b = null;
771         ObjectOutputStream o = null;
772         try {
773             try {
774                 b = new ByteArrayOutputStream();
775                 o = new ObjectOutputStream(b);
776                 o.writeObject(state);
777                 byte[] snapshotBytes = b.toByteArray();
778                 return ByteString.copyFrom(snapshotBytes);
779             } finally {
780                 if (o != null) {
781                     o.flush();
782                     o.close();
783                 }
784                 if (b != null) {
785                     b.close();
786                 }
787             }
788         } catch (IOException e) {
789             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
790         }
791         return null;
792     }
793
794     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
795         AbstractRaftActorBehavior behavior;
796
797         @Override public void onReceive(Object message) throws Exception {
798             if(behavior != null) {
799                 behavior.handleMessage(sender(), message);
800             }
801
802             super.onReceive(message);
803         }
804
805         public static Props props() {
806             return Props.create(ForwardMessageToBehaviorActor.class);
807         }
808     }
809
810     @Test
811     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
812         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
813
814         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
815
816         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
817
818         Follower follower = new Follower(followerActorContext);
819         followerActor.underlyingActor().behavior = follower;
820
821         Map<String, String> peerAddresses = new HashMap<>();
822         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
823
824         leaderActorContext.setPeerAddresses(peerAddresses);
825
826         leaderActorContext.getReplicatedLog().removeFrom(0);
827
828         //create 3 entries
829         leaderActorContext.setReplicatedLog(
830                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
831
832         leaderActorContext.setCommitIndex(1);
833
834         followerActorContext.getReplicatedLog().removeFrom(0);
835
836         // follower too has the exact same log entries and has the same commit index
837         followerActorContext.setReplicatedLog(
838                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
839
840         followerActorContext.setCommitIndex(1);
841
842         leader = new Leader(leaderActorContext);
843
844         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
845
846         assertEquals(1, appendEntries.getLeaderCommit());
847         assertEquals(0, appendEntries.getEntries().size());
848         assertEquals(0, appendEntries.getPrevLogIndex());
849
850         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
851                 leaderActor, AppendEntriesReply.class);
852
853         assertEquals(2, appendEntriesReply.getLogLastIndex());
854         assertEquals(1, appendEntriesReply.getLogLastTerm());
855
856         // follower returns its next index
857         assertEquals(2, appendEntriesReply.getLogLastIndex());
858         assertEquals(1, appendEntriesReply.getLogLastTerm());
859
860         follower.close();
861     }
862
863     @Test
864     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
865         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
866
867         MockRaftActorContext leaderActorContext = createActorContext();
868
869         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
870
871         Follower follower = new Follower(followerActorContext);
872         followerActor.underlyingActor().behavior = follower;
873
874         Map<String, String> peerAddresses = new HashMap<>();
875         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
876
877         leaderActorContext.setPeerAddresses(peerAddresses);
878
879         leaderActorContext.getReplicatedLog().removeFrom(0);
880
881         leaderActorContext.setReplicatedLog(
882                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
883
884         leaderActorContext.setCommitIndex(1);
885
886         followerActorContext.getReplicatedLog().removeFrom(0);
887
888         followerActorContext.setReplicatedLog(
889                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
890
891         // follower has the same log entries but its commit index > leaders commit index
892         followerActorContext.setCommitIndex(2);
893
894         leader = new Leader(leaderActorContext);
895
896         // Initial heartbeat
897         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
898
899         assertEquals(1, appendEntries.getLeaderCommit());
900         assertEquals(0, appendEntries.getEntries().size());
901         assertEquals(0, appendEntries.getPrevLogIndex());
902
903         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
904                 leaderActor, AppendEntriesReply.class);
905
906         assertEquals(2, appendEntriesReply.getLogLastIndex());
907         assertEquals(1, appendEntriesReply.getLogLastTerm());
908
909         leaderActor.underlyingActor().behavior = leader;
910         leader.handleMessage(followerActor, appendEntriesReply);
911
912         leaderActor.underlyingActor().clear();
913         followerActor.underlyingActor().clear();
914
915         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
916                 TimeUnit.MILLISECONDS);
917
918         leader.handleMessage(leaderActor, new SendHeartBeat());
919
920         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
921
922         assertEquals(2, appendEntries.getLeaderCommit());
923         assertEquals(0, appendEntries.getEntries().size());
924         assertEquals(2, appendEntries.getPrevLogIndex());
925
926         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
927
928         assertEquals(2, appendEntriesReply.getLogLastIndex());
929         assertEquals(1, appendEntriesReply.getLogLastTerm());
930
931         assertEquals(2, followerActorContext.getCommitIndex());
932
933         follower.close();
934     }
935
936     @Test
937     public void testHandleAppendEntriesReplyFailure(){
938         logStart("testHandleAppendEntriesReplyFailure");
939
940         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
941
942         leader = new Leader(leaderActorContext);
943
944         // Send initial heartbeat reply with last index.
945         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
946
947         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
948         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
949
950         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
951
952         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
953
954         assertEquals(RaftState.Leader, raftActorBehavior.state());
955
956         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
957     }
958
959     @Test
960     public void testHandleAppendEntriesReplySuccess() throws Exception {
961         logStart("testHandleAppendEntriesReplySuccess");
962
963         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
964
965         leaderActorContext.setReplicatedLog(
966                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
967
968         leaderActorContext.setCommitIndex(1);
969         leaderActorContext.setLastApplied(1);
970         leaderActorContext.getTermInformation().update(1, "leader");
971
972         leader = new Leader(leaderActorContext);
973
974         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
975
976         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
977
978         assertEquals(RaftState.Leader, raftActorBehavior.state());
979
980         assertEquals(2, leaderActorContext.getCommitIndex());
981
982         ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
983                 leaderActor, ApplyLogEntries.class);
984
985         assertEquals(2, leaderActorContext.getLastApplied());
986
987         assertEquals(2, applyLogEntries.getToIndex());
988
989         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
990                 ApplyState.class);
991
992         assertEquals(1,applyStateList.size());
993
994         ApplyState applyState = applyStateList.get(0);
995
996         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
997     }
998
999     @Test
1000     public void testHandleAppendEntriesReplyUnknownFollower(){
1001         logStart("testHandleAppendEntriesReplyUnknownFollower");
1002
1003         MockRaftActorContext leaderActorContext = createActorContext();
1004
1005         leader = new Leader(leaderActorContext);
1006
1007         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1008
1009         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1010
1011         assertEquals(RaftState.Leader, raftActorBehavior.state());
1012     }
1013
1014     @Test
1015     public void testHandleRequestVoteReply(){
1016         logStart("testHandleRequestVoteReply");
1017
1018         MockRaftActorContext leaderActorContext = createActorContext();
1019
1020         leader = new Leader(leaderActorContext);
1021
1022         // Should be a no-op.
1023         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1024                 new RequestVoteReply(1, true));
1025
1026         assertEquals(RaftState.Leader, raftActorBehavior.state());
1027
1028         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1029
1030         assertEquals(RaftState.Leader, raftActorBehavior.state());
1031     }
1032
1033     @Test
1034     public void testIsolatedLeaderCheckNoFollowers() {
1035         logStart("testIsolatedLeaderCheckNoFollowers");
1036
1037         MockRaftActorContext leaderActorContext = createActorContext();
1038
1039         leader = new Leader(leaderActorContext);
1040         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1041         Assert.assertTrue(behavior instanceof Leader);
1042     }
1043
1044     @Test
1045     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1046         logStart("testIsolatedLeaderCheckTwoFollowers");
1047
1048         new JavaTestKit(getSystem()) {{
1049
1050             ActorRef followerActor1 = getTestActor();
1051             ActorRef followerActor2 = getTestActor();
1052
1053             MockRaftActorContext leaderActorContext = createActorContext();
1054
1055             Map<String, String> peerAddresses = new HashMap<>();
1056             peerAddresses.put("follower-1", followerActor1.path().toString());
1057             peerAddresses.put("follower-2", followerActor2.path().toString());
1058
1059             leaderActorContext.setPeerAddresses(peerAddresses);
1060
1061             leader = new Leader(leaderActorContext);
1062             leader.stopIsolatedLeaderCheckSchedule();
1063
1064             leader.markFollowerActive("follower-1");
1065             leader.markFollowerActive("follower-2");
1066             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1067             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1068                 behavior instanceof Leader);
1069
1070             // kill 1 follower and verify if that got killed
1071             final JavaTestKit probe = new JavaTestKit(getSystem());
1072             probe.watch(followerActor1);
1073             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1074             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1075             assertEquals(termMsg1.getActor(), followerActor1);
1076
1077             leader.markFollowerInActive("follower-1");
1078             leader.markFollowerActive("follower-2");
1079             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1080             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1081                 behavior instanceof Leader);
1082
1083             // kill 2nd follower and leader should change to Isolated leader
1084             followerActor2.tell(PoisonPill.getInstance(), null);
1085             probe.watch(followerActor2);
1086             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1087             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1088             assertEquals(termMsg2.getActor(), followerActor2);
1089
1090             leader.markFollowerInActive("follower-2");
1091             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1092             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1093                 behavior instanceof IsolatedLeader);
1094         }};
1095     }
1096
1097
1098     @Test
1099     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1100         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1101
1102         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1103
1104         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1105         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1106         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1107
1108         leaderActorContext.setConfigParams(configParams);
1109
1110         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1111
1112         followerActorContext.setConfigParams(configParams);
1113
1114         Follower follower = new Follower(followerActorContext);
1115         followerActor.underlyingActor().behavior = follower;
1116
1117         leaderActorContext.getReplicatedLog().removeFrom(0);
1118         leaderActorContext.setCommitIndex(-1);
1119         leaderActorContext.setLastApplied(-1);
1120
1121         followerActorContext.getReplicatedLog().removeFrom(0);
1122         followerActorContext.setCommitIndex(-1);
1123         followerActorContext.setLastApplied(-1);
1124
1125         leader = new Leader(leaderActorContext);
1126
1127         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1128                 leaderActor, AppendEntriesReply.class);
1129
1130         leader.handleMessage(followerActor, appendEntriesReply);
1131
1132         // Clear initial heartbeat messages
1133
1134         leaderActor.underlyingActor().clear();
1135         followerActor.underlyingActor().clear();
1136
1137         // create 3 entries
1138         leaderActorContext.setReplicatedLog(
1139                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1140         leaderActorContext.setCommitIndex(1);
1141         leaderActorContext.setLastApplied(1);
1142
1143         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144                 TimeUnit.MILLISECONDS);
1145
1146         leader.handleMessage(leaderActor, new SendHeartBeat());
1147
1148         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1149
1150         // Should send first log entry
1151         assertEquals(1, appendEntries.getLeaderCommit());
1152         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1153         assertEquals(-1, appendEntries.getPrevLogIndex());
1154
1155         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1156
1157         assertEquals(1, appendEntriesReply.getLogLastTerm());
1158         assertEquals(0, appendEntriesReply.getLogLastIndex());
1159
1160         followerActor.underlyingActor().clear();
1161
1162         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1163
1164         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1165
1166         // Should send second log entry
1167         assertEquals(1, appendEntries.getLeaderCommit());
1168         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1169
1170         follower.close();
1171     }
1172
1173     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1174
1175         private final long electionTimeOutIntervalMillis;
1176         private final int snapshotChunkSize;
1177
1178         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1179             super();
1180             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1181             this.snapshotChunkSize = snapshotChunkSize;
1182         }
1183
1184         @Override
1185         public FiniteDuration getElectionTimeOutInterval() {
1186             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1187         }
1188
1189         @Override
1190         public int getSnapshotChunkSize() {
1191             return snapshotChunkSize;
1192         }
1193     }
1194 }