Optimized AppendEntries sending to follower by adding AppendEntry call at end of...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50
51     private final ActorRef leaderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53     private final ActorRef senderActor =
54         getSystem().actorOf(Props.create(DoNothingActor.class));
55
56     @Test
57     public void testHandleMessageForUnknownMessage() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             Leader leader =
60                 new Leader(createActorContext());
61
62             // handle message should return the Leader state when it receives an
63             // unknown message
64             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65             Assert.assertTrue(behavior instanceof Leader);
66         }};
67     }
68
69     @Test
70     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71         new JavaTestKit(getSystem()) {{
72
73             new Within(duration("1 seconds")) {
74                 @Override
75                 protected void run() {
76
77                     ActorRef followerActor = getTestActor();
78
79                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80
81                     Map<String, String> peerAddresses = new HashMap<>();
82
83                     peerAddresses.put(followerActor.path().toString(),
84                         followerActor.path().toString());
85
86                     actorContext.setPeerAddresses(peerAddresses);
87
88                     Leader leader = new Leader(actorContext);
89                     leader.markFollowerActive(followerActor.path().toString());
90                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
91                         TimeUnit.MILLISECONDS);
92                     leader.handleMessage(senderActor, new SendHeartBeat());
93
94                     final String out =
95                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
96                             // do not put code outside this method, will run afterwards
97                             @Override
98                             protected String match(Object in) {
99                                 Object msg = fromSerializableMessage(in);
100                                 if (msg instanceof AppendEntries) {
101                                     if (((AppendEntries)msg).getTerm() == 0) {
102                                         return "match";
103                                     }
104                                     return null;
105                                 } else {
106                                     throw noMatch();
107                                 }
108                             }
109                         }.get(); // this extracts the received message
110
111                     assertEquals("match", out);
112
113                 }
114             };
115         }};
116     }
117
118     @Test
119     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
120         new JavaTestKit(getSystem()) {{
121
122             new Within(duration("1 seconds")) {
123                 @Override
124                 protected void run() {
125
126                     ActorRef followerActor = getTestActor();
127
128                     MockRaftActorContext actorContext =
129                         (MockRaftActorContext) createActorContext();
130
131                     Map<String, String> peerAddresses = new HashMap<>();
132
133                     peerAddresses.put(followerActor.path().toString(),
134                             followerActor.path().toString());
135
136                     actorContext.setPeerAddresses(peerAddresses);
137
138                     Leader leader = new Leader(actorContext);
139                     leader.markFollowerActive(followerActor.path().toString());
140                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
141                         TimeUnit.MILLISECONDS);
142                     RaftActorBehavior raftBehavior = leader
143                         .handleMessage(senderActor, new Replicate(null, null,
144                             new MockRaftActorContext.MockReplicatedLogEntry(1,
145                                 100,
146                                 new MockRaftActorContext.MockPayload("foo"))
147                         ));
148
149                     // State should not change
150                     assertTrue(raftBehavior instanceof Leader);
151
152                     final String out =
153                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
154                             // do not put code outside this method, will run afterwards
155                             @Override
156                             protected String match(Object in) {
157                                 Object msg = fromSerializableMessage(in);
158                                 if (msg instanceof AppendEntries) {
159                                     if (((AppendEntries)msg).getTerm() == 0) {
160                                         return "match";
161                                     }
162                                     return null;
163                                 } else {
164                                     throw noMatch();
165                                 }
166                             }
167                         }.get(); // this extracts the received message
168
169                     assertEquals("match", out);
170                 }
171             };
172         }};
173     }
174
175     @Test
176     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
177         new JavaTestKit(getSystem()) {{
178
179             new Within(duration("1 seconds")) {
180                 @Override
181                 protected void run() {
182
183                     ActorRef raftActor = getTestActor();
184
185                     MockRaftActorContext actorContext =
186                         new MockRaftActorContext("test", getSystem(), raftActor);
187
188                     actorContext.getReplicatedLog().removeFrom(0);
189
190                     actorContext.setReplicatedLog(
191                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
192                             .build());
193
194                     Leader leader = new Leader(actorContext);
195                     RaftActorBehavior raftBehavior = leader
196                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
197
198                     // State should not change
199                     assertTrue(raftBehavior instanceof Leader);
200
201                     assertEquals(1, actorContext.getCommitIndex());
202
203                     final String out =
204                         new ExpectMsg<String>(duration("1 seconds"),
205                             "match hint") {
206                             // do not put code outside this method, will run afterwards
207                             @Override
208                             protected String match(Object in) {
209                                 if (in instanceof ApplyState) {
210                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
211                                         return "match";
212                                     }
213                                     return null;
214                                 } else {
215                                     throw noMatch();
216                                 }
217                             }
218                         }.get(); // this extracts the received message
219
220                     assertEquals("match", out);
221
222                 }
223             };
224         }};
225     }
226
227     @Test
228     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
229         new JavaTestKit(getSystem()) {{
230             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
231
232             Map<String, String> peerAddresses = new HashMap<>();
233             peerAddresses.put(followerActor.path().toString(),
234                 followerActor.path().toString());
235
236             MockRaftActorContext actorContext =
237                 (MockRaftActorContext) createActorContext(leaderActor);
238             actorContext.setPeerAddresses(peerAddresses);
239
240             Map<String, String> leadersSnapshot = new HashMap<>();
241             leadersSnapshot.put("1", "A");
242             leadersSnapshot.put("2", "B");
243             leadersSnapshot.put("3", "C");
244
245             //clears leaders log
246             actorContext.getReplicatedLog().removeFrom(0);
247
248             final int followersLastIndex = 2;
249             final int snapshotIndex = 3;
250             final int newEntryIndex = 4;
251             final int snapshotTerm = 1;
252             final int currentTerm = 2;
253
254             // set the snapshot variables in replicatedlog
255             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
256             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
257             actorContext.setCommitIndex(followersLastIndex);
258             //set follower timeout to 2 mins, helps during debugging
259             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
260
261             MockLeader leader = new MockLeader(actorContext);
262
263             // new entry
264             ReplicatedLogImplEntry entry =
265                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
266                     new MockRaftActorContext.MockPayload("D"));
267
268             //update follower timestamp
269             leader.markFollowerActive(followerActor.path().toString());
270
271             ByteString bs = toByteString(leadersSnapshot);
272             leader.setSnapshot(Optional.of(bs));
273             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
274
275             //send first chunk and no InstallSnapshotReply received yet
276             leader.getFollowerToSnapshot().getNextChunk();
277             leader.getFollowerToSnapshot().incrementChunkIndex();
278
279             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
280                 TimeUnit.MILLISECONDS);
281
282             leader.handleMessage(leaderActor, new SendHeartBeat());
283
284             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
285                 followerActor, AppendEntries.class);
286
287             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
288                 "received", aeproto);
289
290             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
291
292             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
293
294             //InstallSnapshotReply received
295             leader.getFollowerToSnapshot().markSendStatus(true);
296
297             leader.handleMessage(senderActor, new SendHeartBeat());
298
299             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
300                 MessageCollectorActor.getFirstMatching(followerActor,
301                     InstallSnapshot.SERIALIZABLE_CLASS);
302
303             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
304                 isproto);
305
306             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
307
308             assertEquals(snapshotIndex, is.getLastIncludedIndex());
309
310         }};
311     }
312
313     @Test
314     public void testSendAppendEntriesSnapshotScenario() {
315         new JavaTestKit(getSystem()) {{
316
317             ActorRef followerActor = getTestActor();
318
319             Map<String, String> peerAddresses = new HashMap<>();
320             peerAddresses.put(followerActor.path().toString(),
321                 followerActor.path().toString());
322
323             MockRaftActorContext actorContext =
324                 (MockRaftActorContext) createActorContext(getRef());
325             actorContext.setPeerAddresses(peerAddresses);
326
327             Map<String, String> leadersSnapshot = new HashMap<>();
328             leadersSnapshot.put("1", "A");
329             leadersSnapshot.put("2", "B");
330             leadersSnapshot.put("3", "C");
331
332             //clears leaders log
333             actorContext.getReplicatedLog().removeFrom(0);
334
335             final int followersLastIndex = 2;
336             final int snapshotIndex = 3;
337             final int newEntryIndex = 4;
338             final int snapshotTerm = 1;
339             final int currentTerm = 2;
340
341             // set the snapshot variables in replicatedlog
342             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
343             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
344             actorContext.setCommitIndex(followersLastIndex);
345
346             Leader leader = new Leader(actorContext);
347
348             // new entry
349             ReplicatedLogImplEntry entry =
350                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
351                     new MockRaftActorContext.MockPayload("D"));
352
353             //update follower timestamp
354             leader.markFollowerActive(followerActor.path().toString());
355
356             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
357                 TimeUnit.MILLISECONDS);
358
359             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
360             RaftActorBehavior raftBehavior = leader.handleMessage(
361                 senderActor, new Replicate(null, "state-id", entry));
362
363             assertTrue(raftBehavior instanceof Leader);
364
365             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
366             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
367                 @Override
368                 protected Boolean match(Object o) throws Exception {
369                     if (o instanceof InitiateInstallSnapshot) {
370                         return true;
371                     }
372                     return false;
373                 }
374             }.get();
375
376             boolean initiateInitiateInstallSnapshot = false;
377             for (Boolean b: matches) {
378                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
379             }
380
381             assertTrue(initiateInitiateInstallSnapshot);
382         }};
383     }
384
385     @Test
386     public void testInitiateInstallSnapshot() throws Exception {
387         new JavaTestKit(getSystem()) {{
388
389             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
390
391             ActorRef followerActor = getTestActor();
392
393             Map<String, String> peerAddresses = new HashMap<>();
394             peerAddresses.put(followerActor.path().toString(),
395                 followerActor.path().toString());
396
397
398             MockRaftActorContext actorContext =
399                 (MockRaftActorContext) createActorContext(leaderActor);
400             actorContext.setPeerAddresses(peerAddresses);
401
402             Map<String, String> leadersSnapshot = new HashMap<>();
403             leadersSnapshot.put("1", "A");
404             leadersSnapshot.put("2", "B");
405             leadersSnapshot.put("3", "C");
406
407             //clears leaders log
408             actorContext.getReplicatedLog().removeFrom(0);
409
410             final int followersLastIndex = 2;
411             final int snapshotIndex = 3;
412             final int newEntryIndex = 4;
413             final int snapshotTerm = 1;
414             final int currentTerm = 2;
415
416             // set the snapshot variables in replicatedlog
417             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
418             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
419             actorContext.setLastApplied(3);
420             actorContext.setCommitIndex(followersLastIndex);
421
422             Leader leader = new Leader(actorContext);
423             // set the snapshot as absent and check if capture-snapshot is invoked.
424             leader.setSnapshot(Optional.<ByteString>absent());
425
426             // new entry
427             ReplicatedLogImplEntry entry =
428                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
429                     new MockRaftActorContext.MockPayload("D"));
430
431             actorContext.getReplicatedLog().append(entry);
432
433             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
434             RaftActorBehavior raftBehavior = leader.handleMessage(
435                 leaderActor, new InitiateInstallSnapshot());
436
437             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
438                 getFirstMatching(leaderActor, CaptureSnapshot.class);
439
440             assertNotNull(cs);
441
442             assertTrue(cs.isInstallSnapshotInitiated());
443             assertEquals(3, cs.getLastAppliedIndex());
444             assertEquals(1, cs.getLastAppliedTerm());
445             assertEquals(4, cs.getLastIndex());
446             assertEquals(2, cs.getLastTerm());
447         }};
448     }
449
450     @Test
451     public void testInstallSnapshot() {
452         new JavaTestKit(getSystem()) {{
453
454             ActorRef followerActor = getTestActor();
455
456             Map<String, String> peerAddresses = new HashMap<>();
457             peerAddresses.put(followerActor.path().toString(),
458                 followerActor.path().toString());
459
460             MockRaftActorContext actorContext =
461                 (MockRaftActorContext) createActorContext();
462             actorContext.setPeerAddresses(peerAddresses);
463
464
465             Map<String, String> leadersSnapshot = new HashMap<>();
466             leadersSnapshot.put("1", "A");
467             leadersSnapshot.put("2", "B");
468             leadersSnapshot.put("3", "C");
469
470             //clears leaders log
471             actorContext.getReplicatedLog().removeFrom(0);
472
473             final int followersLastIndex = 2;
474             final int snapshotIndex = 3;
475             final int newEntryIndex = 4;
476             final int snapshotTerm = 1;
477             final int currentTerm = 2;
478
479             // set the snapshot variables in replicatedlog
480             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483             actorContext.setCommitIndex(followersLastIndex);
484
485             Leader leader = new Leader(actorContext);
486
487             // new entry
488             ReplicatedLogImplEntry entry =
489                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
490                     new MockRaftActorContext.MockPayload("D"));
491
492             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
493                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
494
495             assertTrue(raftBehavior instanceof Leader);
496
497             // check if installsnapshot gets called with the correct values.
498             final String out =
499                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
500                     // do not put code outside this method, will run afterwards
501                     @Override
502                     protected String match(Object in) {
503                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
504                             InstallSnapshot is = (InstallSnapshot)
505                                 SerializationUtils.fromSerializable(in);
506                             if (is.getData() == null) {
507                                 return "InstallSnapshot data is null";
508                             }
509                             if (is.getLastIncludedIndex() != snapshotIndex) {
510                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
511                             }
512                             if (is.getLastIncludedTerm() != snapshotTerm) {
513                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
514                             }
515                             if (is.getTerm() == currentTerm) {
516                                 return is.getTerm() + "!=" + currentTerm;
517                             }
518
519                             return "match";
520
521                         } else {
522                             return "message mismatch:" + in.getClass();
523                         }
524                     }
525                 }.get(); // this extracts the received message
526
527             assertEquals("match", out);
528         }};
529     }
530
531     @Test
532     public void testHandleInstallSnapshotReplyLastChunk() {
533         new JavaTestKit(getSystem()) {{
534
535             ActorRef followerActor = getTestActor();
536
537             Map<String, String> peerAddresses = new HashMap<>();
538             peerAddresses.put(followerActor.path().toString(),
539                 followerActor.path().toString());
540
541             final int followersLastIndex = 2;
542             final int snapshotIndex = 3;
543             final int newEntryIndex = 4;
544             final int snapshotTerm = 1;
545             final int currentTerm = 2;
546
547             MockRaftActorContext actorContext =
548                 (MockRaftActorContext) createActorContext();
549             actorContext.setPeerAddresses(peerAddresses);
550             actorContext.setCommitIndex(followersLastIndex);
551
552             MockLeader leader = new MockLeader(actorContext);
553
554             Map<String, String> leadersSnapshot = new HashMap<>();
555             leadersSnapshot.put("1", "A");
556             leadersSnapshot.put("2", "B");
557             leadersSnapshot.put("3", "C");
558
559             // set the snapshot variables in replicatedlog
560
561             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
564
565             ByteString bs = toByteString(leadersSnapshot);
566             leader.setSnapshot(Optional.of(bs));
567             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
568             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
569                 leader.getFollowerToSnapshot().getNextChunk();
570                 leader.getFollowerToSnapshot().incrementChunkIndex();
571             }
572
573             //clears leaders log
574             actorContext.getReplicatedLog().removeFrom(0);
575
576             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
577                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
578                     leader.getFollowerToSnapshot().getChunkIndex(), true));
579
580             assertTrue(raftBehavior instanceof Leader);
581
582             assertEquals(0, leader.followerSnapshotSize());
583             assertEquals(1, leader.followerLogSize());
584             assertNotNull(leader.getFollower(followerActor.path().toString()));
585             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
586             assertEquals(snapshotIndex, fli.getMatchIndex());
587             assertEquals(snapshotIndex, fli.getMatchIndex());
588             assertEquals(snapshotIndex + 1, fli.getNextIndex());
589         }};
590     }
591
592     @Test
593     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
594         new JavaTestKit(getSystem()) {{
595
596             TestActorRef<MessageCollectorActor> followerActor =
597                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
598
599             Map<String, String> peerAddresses = new HashMap<>();
600             peerAddresses.put(followerActor.path().toString(),
601                     followerActor.path().toString());
602
603             final int followersLastIndex = 2;
604             final int snapshotIndex = 3;
605             final int snapshotTerm = 1;
606             final int currentTerm = 2;
607
608             MockRaftActorContext actorContext =
609                     (MockRaftActorContext) createActorContext();
610
611             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
612                 @Override
613                 public int getSnapshotChunkSize() {
614                     return 50;
615                 }
616             });
617             actorContext.setPeerAddresses(peerAddresses);
618             actorContext.setCommitIndex(followersLastIndex);
619
620             MockLeader leader = new MockLeader(actorContext);
621
622             Map<String, String> leadersSnapshot = new HashMap<>();
623             leadersSnapshot.put("1", "A");
624             leadersSnapshot.put("2", "B");
625             leadersSnapshot.put("3", "C");
626
627             // set the snapshot variables in replicatedlog
628             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
629             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
630             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
631
632             ByteString bs = toByteString(leadersSnapshot);
633             leader.setSnapshot(Optional.of(bs));
634
635             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
636
637             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
638
639             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
640
641             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
642
643             assertEquals(1, installSnapshot.getChunkIndex());
644             assertEquals(3, installSnapshot.getTotalChunks());
645
646
647             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
648                 followerActor.path().toString(), -1, false));
649
650             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
651                 TimeUnit.MILLISECONDS);
652
653             leader.handleMessage(leaderActor, new SendHeartBeat());
654
655             o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
656
657             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
658
659             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
660
661             assertEquals(1, installSnapshot.getChunkIndex());
662             assertEquals(3, installSnapshot.getTotalChunks());
663
664             followerActor.tell(PoisonPill.getInstance(), getRef());
665         }};
666     }
667
668     @Test
669     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
670         new JavaTestKit(getSystem()) {
671             {
672
673                 TestActorRef<MessageCollectorActor> followerActor =
674                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
675
676                 Map<String, String> peerAddresses = new HashMap<>();
677                 peerAddresses.put(followerActor.path().toString(),
678                         followerActor.path().toString());
679
680                 final int followersLastIndex = 2;
681                 final int snapshotIndex = 3;
682                 final int snapshotTerm = 1;
683                 final int currentTerm = 2;
684
685                 MockRaftActorContext actorContext =
686                         (MockRaftActorContext) createActorContext();
687
688                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
689                     @Override
690                     public int getSnapshotChunkSize() {
691                         return 50;
692                     }
693                 });
694                 actorContext.setPeerAddresses(peerAddresses);
695                 actorContext.setCommitIndex(followersLastIndex);
696
697                 MockLeader leader = new MockLeader(actorContext);
698
699                 Map<String, String> leadersSnapshot = new HashMap<>();
700                 leadersSnapshot.put("1", "A");
701                 leadersSnapshot.put("2", "B");
702                 leadersSnapshot.put("3", "C");
703
704                 // set the snapshot variables in replicatedlog
705                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
706                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
707                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
708
709                 ByteString bs = toByteString(leadersSnapshot);
710                 leader.setSnapshot(Optional.of(bs));
711
712                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
713
714                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
715
716                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
717
718                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
719
720                 assertEquals(1, installSnapshot.getChunkIndex());
721                 assertEquals(3, installSnapshot.getTotalChunks());
722                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
723
724                 int hashCode = installSnapshot.getData().hashCode();
725
726                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
727
728                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
729
730                 leader.handleMessage(leaderActor, new SendHeartBeat());
731
732                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
733
734                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
735
736                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
737
738                 assertEquals(2, installSnapshot.getChunkIndex());
739                 assertEquals(3, installSnapshot.getTotalChunks());
740                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
741
742                 followerActor.tell(PoisonPill.getInstance(), getRef());
743             }};
744     }
745
746     @Test
747     public void testFollowerToSnapshotLogic() {
748
749         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
750
751         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
752             @Override
753             public int getSnapshotChunkSize() {
754                 return 50;
755             }
756         });
757
758         MockLeader leader = new MockLeader(actorContext);
759
760         Map<String, String> leadersSnapshot = new HashMap<>();
761         leadersSnapshot.put("1", "A");
762         leadersSnapshot.put("2", "B");
763         leadersSnapshot.put("3", "C");
764
765         ByteString bs = toByteString(leadersSnapshot);
766         byte[] barray = bs.toByteArray();
767
768         leader.createFollowerToSnapshot("followerId", bs);
769         assertEquals(bs.size(), barray.length);
770
771         int chunkIndex=0;
772         for (int i=0; i < barray.length; i = i + 50) {
773             int j = i + 50;
774             chunkIndex++;
775
776             if (i + 50 > barray.length) {
777                 j = barray.length;
778             }
779
780             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
781             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
782             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
783
784             leader.getFollowerToSnapshot().markSendStatus(true);
785             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
786                 leader.getFollowerToSnapshot().incrementChunkIndex();
787             }
788         }
789
790         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
791     }
792
793
794     @Override protected RaftActorBehavior createBehavior(
795         RaftActorContext actorContext) {
796         return new Leader(actorContext);
797     }
798
799     @Override protected RaftActorContext createActorContext() {
800         return createActorContext(leaderActor);
801     }
802
803     @Override
804     protected RaftActorContext createActorContext(ActorRef actorRef) {
805         return new MockRaftActorContext("test", getSystem(), actorRef);
806     }
807
808     private ByteString toByteString(Map<String, String> state) {
809         ByteArrayOutputStream b = null;
810         ObjectOutputStream o = null;
811         try {
812             try {
813                 b = new ByteArrayOutputStream();
814                 o = new ObjectOutputStream(b);
815                 o.writeObject(state);
816                 byte[] snapshotBytes = b.toByteArray();
817                 return ByteString.copyFrom(snapshotBytes);
818             } finally {
819                 if (o != null) {
820                     o.flush();
821                     o.close();
822                 }
823                 if (b != null) {
824                     b.close();
825                 }
826             }
827         } catch (IOException e) {
828             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
829         }
830         return null;
831     }
832
833     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
834         private static AbstractRaftActorBehavior behavior;
835
836         public ForwardMessageToBehaviorActor(){
837
838         }
839
840         @Override public void onReceive(Object message) throws Exception {
841             super.onReceive(message);
842             behavior.handleMessage(sender(), message);
843         }
844
845         public static void setBehavior(AbstractRaftActorBehavior behavior){
846             ForwardMessageToBehaviorActor.behavior = behavior;
847         }
848     }
849
850     @Test
851     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
852         new JavaTestKit(getSystem()) {{
853
854             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
855
856             MockRaftActorContext leaderActorContext =
857                 new MockRaftActorContext("leader", getSystem(), leaderActor);
858
859             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
860
861             MockRaftActorContext followerActorContext =
862                 new MockRaftActorContext("follower", getSystem(), followerActor);
863
864             Follower follower = new Follower(followerActorContext);
865
866             ForwardMessageToBehaviorActor.setBehavior(follower);
867
868             Map<String, String> peerAddresses = new HashMap<>();
869             peerAddresses.put(followerActor.path().toString(),
870                 followerActor.path().toString());
871
872             leaderActorContext.setPeerAddresses(peerAddresses);
873
874             leaderActorContext.getReplicatedLog().removeFrom(0);
875
876             //create 3 entries
877             leaderActorContext.setReplicatedLog(
878                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
879
880             leaderActorContext.setCommitIndex(1);
881
882             followerActorContext.getReplicatedLog().removeFrom(0);
883
884             // follower too has the exact same log entries and has the same commit index
885             followerActorContext.setReplicatedLog(
886                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
887
888             followerActorContext.setCommitIndex(1);
889
890             Leader leader = new Leader(leaderActorContext);
891             leader.markFollowerActive(followerActor.path().toString());
892
893             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
894                 TimeUnit.MILLISECONDS);
895
896             leader.handleMessage(leaderActor, new SendHeartBeat());
897
898             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
899                     .getFirstMatching(followerActor, AppendEntries.class);
900
901             assertNotNull(appendEntries);
902
903             assertEquals(1, appendEntries.getLeaderCommit());
904             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
905             assertEquals(0, appendEntries.getPrevLogIndex());
906
907             AppendEntriesReply appendEntriesReply =
908                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
909                     leaderActor, AppendEntriesReply.class);
910
911             assertNotNull(appendEntriesReply);
912
913             // follower returns its next index
914             assertEquals(2, appendEntriesReply.getLogLastIndex());
915             assertEquals(1, appendEntriesReply.getLogLastTerm());
916
917         }};
918     }
919
920
921     @Test
922     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
923         new JavaTestKit(getSystem()) {{
924
925             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
926
927             MockRaftActorContext leaderActorContext =
928                 new MockRaftActorContext("leader", getSystem(), leaderActor);
929
930             ActorRef followerActor = getSystem().actorOf(
931                 Props.create(ForwardMessageToBehaviorActor.class));
932
933             MockRaftActorContext followerActorContext =
934                 new MockRaftActorContext("follower", getSystem(), followerActor);
935
936             Follower follower = new Follower(followerActorContext);
937
938             ForwardMessageToBehaviorActor.setBehavior(follower);
939
940             Map<String, String> peerAddresses = new HashMap<>();
941             peerAddresses.put(followerActor.path().toString(),
942                 followerActor.path().toString());
943
944             leaderActorContext.setPeerAddresses(peerAddresses);
945
946             leaderActorContext.getReplicatedLog().removeFrom(0);
947
948             leaderActorContext.setReplicatedLog(
949                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
950
951             leaderActorContext.setCommitIndex(1);
952
953             followerActorContext.getReplicatedLog().removeFrom(0);
954
955             followerActorContext.setReplicatedLog(
956                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
957
958             // follower has the same log entries but its commit index > leaders commit index
959             followerActorContext.setCommitIndex(2);
960
961             Leader leader = new Leader(leaderActorContext);
962             leader.markFollowerActive(followerActor.path().toString());
963
964             Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
965
966             leader.handleMessage(leaderActor, new SendHeartBeat());
967
968             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
969                     .getFirstMatching(followerActor, AppendEntries.class);
970
971             assertNotNull(appendEntries);
972
973             assertEquals(1, appendEntries.getLeaderCommit());
974             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
975             assertEquals(0, appendEntries.getPrevLogIndex());
976
977             AppendEntriesReply appendEntriesReply =
978                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
979                     leaderActor, AppendEntriesReply.class);
980
981             assertNotNull(appendEntriesReply);
982
983             assertEquals(2, appendEntriesReply.getLogLastIndex());
984             assertEquals(1, appendEntriesReply.getLogLastTerm());
985
986         }};
987     }
988
989     @Test
990     public void testHandleAppendEntriesReplyFailure(){
991         new JavaTestKit(getSystem()) {
992             {
993
994                 ActorRef leaderActor =
995                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
996
997                 ActorRef followerActor =
998                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
999
1000
1001                 MockRaftActorContext leaderActorContext =
1002                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1003
1004                 Map<String, String> peerAddresses = new HashMap<>();
1005                 peerAddresses.put("follower-1",
1006                     followerActor.path().toString());
1007
1008                 leaderActorContext.setPeerAddresses(peerAddresses);
1009
1010                 Leader leader = new Leader(leaderActorContext);
1011
1012                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1013
1014                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1015
1016                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1017
1018             }};
1019     }
1020
1021     @Test
1022     public void testHandleAppendEntriesReplySuccess() throws Exception {
1023         new JavaTestKit(getSystem()) {
1024             {
1025
1026                 ActorRef leaderActor =
1027                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1028
1029                 ActorRef followerActor =
1030                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1031
1032
1033                 MockRaftActorContext leaderActorContext =
1034                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1035
1036                 leaderActorContext.setReplicatedLog(
1037                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1038
1039                 Map<String, String> peerAddresses = new HashMap<>();
1040                 peerAddresses.put("follower-1",
1041                     followerActor.path().toString());
1042
1043                 leaderActorContext.setPeerAddresses(peerAddresses);
1044                 leaderActorContext.setCommitIndex(1);
1045                 leaderActorContext.setLastApplied(1);
1046                 leaderActorContext.getTermInformation().update(1, "leader");
1047
1048                 Leader leader = new Leader(leaderActorContext);
1049
1050                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1051
1052                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1053
1054                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1055
1056                 assertEquals(2, leaderActorContext.getCommitIndex());
1057
1058                 ApplyLogEntries applyLogEntries =
1059                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1060                         ApplyLogEntries.class);
1061
1062                 assertNotNull(applyLogEntries);
1063
1064                 assertEquals(2, leaderActorContext.getLastApplied());
1065
1066                 assertEquals(2, applyLogEntries.getToIndex());
1067
1068                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1069                     ApplyState.class);
1070
1071                 assertEquals(1,applyStateList.size());
1072
1073                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1074
1075                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1076
1077             }};
1078     }
1079
1080     @Test
1081     public void testHandleAppendEntriesReplyUnknownFollower(){
1082         new JavaTestKit(getSystem()) {
1083             {
1084
1085                 ActorRef leaderActor =
1086                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1087
1088                 MockRaftActorContext leaderActorContext =
1089                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1090
1091                 Leader leader = new Leader(leaderActorContext);
1092
1093                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1094
1095                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1096
1097                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1098
1099             }};
1100     }
1101
1102     @Test
1103     public void testHandleRequestVoteReply(){
1104         new JavaTestKit(getSystem()) {
1105             {
1106
1107                 ActorRef leaderActor =
1108                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1109
1110                 MockRaftActorContext leaderActorContext =
1111                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1112
1113                 Leader leader = new Leader(leaderActorContext);
1114
1115                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1116
1117                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1118
1119                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1120
1121                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1122             }};
1123     }
1124
1125     @Test
1126     public void testIsolatedLeaderCheckNoFollowers() {
1127         new JavaTestKit(getSystem()) {{
1128             ActorRef leaderActor = getTestActor();
1129
1130             MockRaftActorContext leaderActorContext =
1131                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1132
1133             Map<String, String> peerAddresses = new HashMap<>();
1134             leaderActorContext.setPeerAddresses(peerAddresses);
1135
1136             Leader leader = new Leader(leaderActorContext);
1137             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1138             Assert.assertTrue(behavior instanceof Leader);
1139         }};
1140     }
1141
1142     @Test
1143     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1144         new JavaTestKit(getSystem()) {{
1145
1146             ActorRef followerActor1 = getTestActor();
1147             ActorRef followerActor2 = getTestActor();
1148
1149             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1150
1151             Map<String, String> peerAddresses = new HashMap<>();
1152             peerAddresses.put("follower-1", followerActor1.path().toString());
1153             peerAddresses.put("follower-2", followerActor2.path().toString());
1154
1155             leaderActorContext.setPeerAddresses(peerAddresses);
1156
1157             Leader leader = new Leader(leaderActorContext);
1158             leader.stopIsolatedLeaderCheckSchedule();
1159
1160             leader.markFollowerActive("follower-1");
1161             leader.markFollowerActive("follower-2");
1162             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1163             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1164                 behavior instanceof Leader);
1165
1166             // kill 1 follower and verify if that got killed
1167             final JavaTestKit probe = new JavaTestKit(getSystem());
1168             probe.watch(followerActor1);
1169             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1170             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1171             assertEquals(termMsg1.getActor(), followerActor1);
1172
1173             leader.markFollowerInActive("follower-1");
1174             leader.markFollowerActive("follower-2");
1175             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1176             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1177                 behavior instanceof Leader);
1178
1179             // kill 2nd follower and leader should change to Isolated leader
1180             followerActor2.tell(PoisonPill.getInstance(), null);
1181             probe.watch(followerActor2);
1182             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1183             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1184             assertEquals(termMsg2.getActor(), followerActor2);
1185
1186             leader.markFollowerInActive("follower-2");
1187             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1188             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1189                 behavior instanceof IsolatedLeader);
1190
1191         }};
1192     }
1193
1194
1195     @Test
1196     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1197         new JavaTestKit(getSystem()) {{
1198
1199             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1200
1201             MockRaftActorContext leaderActorContext =
1202                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1203
1204             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1205             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1206             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1207
1208             leaderActorContext.setConfigParams(configParams);
1209
1210             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1211
1212             MockRaftActorContext followerActorContext =
1213                 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1214
1215             followerActorContext.setConfigParams(configParams);
1216
1217             Follower follower = new Follower(followerActorContext);
1218
1219             ForwardMessageToBehaviorActor.setBehavior(follower);
1220
1221             Map<String, String> peerAddresses = new HashMap<>();
1222             peerAddresses.put("follower-reply",
1223                 followerActor.path().toString());
1224
1225             leaderActorContext.setPeerAddresses(peerAddresses);
1226
1227             leaderActorContext.getReplicatedLog().removeFrom(0);
1228
1229             //create 3 entries
1230             leaderActorContext.setReplicatedLog(
1231                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1232
1233             leaderActorContext.setCommitIndex(1);
1234
1235             Leader leader = new Leader(leaderActorContext);
1236             leader.markFollowerActive("follower-reply");
1237
1238             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1239                 TimeUnit.MILLISECONDS);
1240
1241             leader.handleMessage(leaderActor, new SendHeartBeat());
1242
1243             AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1244                 .getFirstMatching(followerActor, AppendEntries.class);
1245
1246             assertNotNull(appendEntries);
1247
1248             assertEquals(1, appendEntries.getLeaderCommit());
1249             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1250             assertEquals(0, appendEntries.getPrevLogIndex());
1251
1252             AppendEntriesReply appendEntriesReply =
1253                 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1254
1255             assertNotNull(appendEntriesReply);
1256
1257             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1258
1259             List<Object> entries = ForwardMessageToBehaviorActor
1260                 .getAllMatching(followerActor, AppendEntries.class);
1261
1262             assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1263
1264             AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1265
1266             assertEquals(1, appendEntriesSecond.getLeaderCommit());
1267             assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1268             assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1269
1270         }};
1271     }
1272
1273     class MockLeader extends Leader {
1274
1275         FollowerToSnapshot fts;
1276
1277         public MockLeader(RaftActorContext context){
1278             super(context);
1279         }
1280
1281         public FollowerToSnapshot getFollowerToSnapshot() {
1282             return fts;
1283         }
1284
1285         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1286             fts = new FollowerToSnapshot(bs);
1287             setFollowerSnapshot(followerId, fts);
1288         }
1289     }
1290
1291     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1292
1293         private final long electionTimeOutIntervalMillis;
1294         private final int snapshotChunkSize;
1295
1296         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1297             super();
1298             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1299             this.snapshotChunkSize = snapshotChunkSize;
1300         }
1301
1302         @Override
1303         public FiniteDuration getElectionTimeOutInterval() {
1304             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1305         }
1306
1307         @Override
1308         public int getSnapshotChunkSize() {
1309             return snapshotChunkSize;
1310         }
1311     }
1312 }