Merge "Optimize install snapshot, by making a call at end of installsnapshotreply"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50
51     private final ActorRef leaderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53     private final ActorRef senderActor =
54         getSystem().actorOf(Props.create(DoNothingActor.class));
55
56     @Test
57     public void testHandleMessageForUnknownMessage() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             Leader leader =
60                 new Leader(createActorContext());
61
62             // handle message should return the Leader state when it receives an
63             // unknown message
64             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65             Assert.assertTrue(behavior instanceof Leader);
66         }};
67     }
68
69     @Test
70     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71         new JavaTestKit(getSystem()) {{
72
73             new Within(duration("1 seconds")) {
74                 @Override
75                 protected void run() {
76
77                     ActorRef followerActor = getTestActor();
78
79                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80
81                     Map<String, String> peerAddresses = new HashMap<>();
82
83                     peerAddresses.put(followerActor.path().toString(),
84                         followerActor.path().toString());
85
86                     actorContext.setPeerAddresses(peerAddresses);
87
88                     Leader leader = new Leader(actorContext);
89                     leader.markFollowerActive(followerActor.path().toString());
90                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
91                         TimeUnit.MILLISECONDS);
92                     leader.handleMessage(senderActor, new SendHeartBeat());
93
94                     final String out =
95                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
96                             // do not put code outside this method, will run afterwards
97                             @Override
98                             protected String match(Object in) {
99                                 Object msg = fromSerializableMessage(in);
100                                 if (msg instanceof AppendEntries) {
101                                     if (((AppendEntries)msg).getTerm() == 0) {
102                                         return "match";
103                                     }
104                                     return null;
105                                 } else {
106                                     throw noMatch();
107                                 }
108                             }
109                         }.get(); // this extracts the received message
110
111                     assertEquals("match", out);
112
113                 }
114             };
115         }};
116     }
117
118     @Test
119     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
120         new JavaTestKit(getSystem()) {{
121
122             new Within(duration("1 seconds")) {
123                 @Override
124                 protected void run() {
125
126                     ActorRef followerActor = getTestActor();
127
128                     MockRaftActorContext actorContext =
129                         (MockRaftActorContext) createActorContext();
130
131                     Map<String, String> peerAddresses = new HashMap<>();
132
133                     peerAddresses.put(followerActor.path().toString(),
134                             followerActor.path().toString());
135
136                     actorContext.setPeerAddresses(peerAddresses);
137
138                     Leader leader = new Leader(actorContext);
139                     leader.markFollowerActive(followerActor.path().toString());
140                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
141                         TimeUnit.MILLISECONDS);
142                     RaftActorBehavior raftBehavior = leader
143                         .handleMessage(senderActor, new Replicate(null, null,
144                             new MockRaftActorContext.MockReplicatedLogEntry(1,
145                                 100,
146                                 new MockRaftActorContext.MockPayload("foo"))
147                         ));
148
149                     // State should not change
150                     assertTrue(raftBehavior instanceof Leader);
151
152                     final String out =
153                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
154                             // do not put code outside this method, will run afterwards
155                             @Override
156                             protected String match(Object in) {
157                                 Object msg = fromSerializableMessage(in);
158                                 if (msg instanceof AppendEntries) {
159                                     if (((AppendEntries)msg).getTerm() == 0) {
160                                         return "match";
161                                     }
162                                     return null;
163                                 } else {
164                                     throw noMatch();
165                                 }
166                             }
167                         }.get(); // this extracts the received message
168
169                     assertEquals("match", out);
170                 }
171             };
172         }};
173     }
174
175     @Test
176     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
177         new JavaTestKit(getSystem()) {{
178
179             new Within(duration("1 seconds")) {
180                 @Override
181                 protected void run() {
182
183                     ActorRef raftActor = getTestActor();
184
185                     MockRaftActorContext actorContext =
186                         new MockRaftActorContext("test", getSystem(), raftActor);
187
188                     actorContext.getReplicatedLog().removeFrom(0);
189
190                     actorContext.setReplicatedLog(
191                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
192                             .build());
193
194                     Leader leader = new Leader(actorContext);
195                     RaftActorBehavior raftBehavior = leader
196                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
197
198                     // State should not change
199                     assertTrue(raftBehavior instanceof Leader);
200
201                     assertEquals(1, actorContext.getCommitIndex());
202
203                     final String out =
204                         new ExpectMsg<String>(duration("1 seconds"),
205                             "match hint") {
206                             // do not put code outside this method, will run afterwards
207                             @Override
208                             protected String match(Object in) {
209                                 if (in instanceof ApplyState) {
210                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
211                                         return "match";
212                                     }
213                                     return null;
214                                 } else {
215                                     throw noMatch();
216                                 }
217                             }
218                         }.get(); // this extracts the received message
219
220                     assertEquals("match", out);
221
222                 }
223             };
224         }};
225     }
226
227     @Test
228     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
229         new JavaTestKit(getSystem()) {{
230             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
231
232             Map<String, String> peerAddresses = new HashMap<>();
233             peerAddresses.put(followerActor.path().toString(),
234                 followerActor.path().toString());
235
236             MockRaftActorContext actorContext =
237                 (MockRaftActorContext) createActorContext(leaderActor);
238             actorContext.setPeerAddresses(peerAddresses);
239
240             Map<String, String> leadersSnapshot = new HashMap<>();
241             leadersSnapshot.put("1", "A");
242             leadersSnapshot.put("2", "B");
243             leadersSnapshot.put("3", "C");
244
245             //clears leaders log
246             actorContext.getReplicatedLog().removeFrom(0);
247
248             final int followersLastIndex = 2;
249             final int snapshotIndex = 3;
250             final int newEntryIndex = 4;
251             final int snapshotTerm = 1;
252             final int currentTerm = 2;
253
254             // set the snapshot variables in replicatedlog
255             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
256             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
257             actorContext.setCommitIndex(followersLastIndex);
258             //set follower timeout to 2 mins, helps during debugging
259             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
260
261             MockLeader leader = new MockLeader(actorContext);
262
263             // new entry
264             ReplicatedLogImplEntry entry =
265                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
266                     new MockRaftActorContext.MockPayload("D"));
267
268             //update follower timestamp
269             leader.markFollowerActive(followerActor.path().toString());
270
271             ByteString bs = toByteString(leadersSnapshot);
272             leader.setSnapshot(Optional.of(bs));
273             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
274
275             //send first chunk and no InstallSnapshotReply received yet
276             leader.getFollowerToSnapshot().getNextChunk();
277             leader.getFollowerToSnapshot().incrementChunkIndex();
278
279             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
280                 TimeUnit.MILLISECONDS);
281
282             leader.handleMessage(leaderActor, new SendHeartBeat());
283
284             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
285                 followerActor, AppendEntries.class);
286
287             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
288                 "received", aeproto);
289
290             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
291
292             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
293
294             //InstallSnapshotReply received
295             leader.getFollowerToSnapshot().markSendStatus(true);
296
297             leader.handleMessage(senderActor, new SendHeartBeat());
298
299             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
300                 MessageCollectorActor.getFirstMatching(followerActor,
301                     InstallSnapshot.SERIALIZABLE_CLASS);
302
303             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
304                 isproto);
305
306             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
307
308             assertEquals(snapshotIndex, is.getLastIncludedIndex());
309
310         }};
311     }
312
313     @Test
314     public void testSendAppendEntriesSnapshotScenario() {
315         new JavaTestKit(getSystem()) {{
316
317             ActorRef followerActor = getTestActor();
318
319             Map<String, String> peerAddresses = new HashMap<>();
320             peerAddresses.put(followerActor.path().toString(),
321                 followerActor.path().toString());
322
323             MockRaftActorContext actorContext =
324                 (MockRaftActorContext) createActorContext(getRef());
325             actorContext.setPeerAddresses(peerAddresses);
326
327             Map<String, String> leadersSnapshot = new HashMap<>();
328             leadersSnapshot.put("1", "A");
329             leadersSnapshot.put("2", "B");
330             leadersSnapshot.put("3", "C");
331
332             //clears leaders log
333             actorContext.getReplicatedLog().removeFrom(0);
334
335             final int followersLastIndex = 2;
336             final int snapshotIndex = 3;
337             final int newEntryIndex = 4;
338             final int snapshotTerm = 1;
339             final int currentTerm = 2;
340
341             // set the snapshot variables in replicatedlog
342             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
343             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
344             actorContext.setCommitIndex(followersLastIndex);
345
346             Leader leader = new Leader(actorContext);
347
348             // new entry
349             ReplicatedLogImplEntry entry =
350                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
351                     new MockRaftActorContext.MockPayload("D"));
352
353             //update follower timestamp
354             leader.markFollowerActive(followerActor.path().toString());
355
356             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
357                 TimeUnit.MILLISECONDS);
358
359             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
360             RaftActorBehavior raftBehavior = leader.handleMessage(
361                 senderActor, new Replicate(null, "state-id", entry));
362
363             assertTrue(raftBehavior instanceof Leader);
364
365             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
366             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
367                 @Override
368                 protected Boolean match(Object o) throws Exception {
369                     if (o instanceof InitiateInstallSnapshot) {
370                         return true;
371                     }
372                     return false;
373                 }
374             }.get();
375
376             boolean initiateInitiateInstallSnapshot = false;
377             for (Boolean b: matches) {
378                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
379             }
380
381             assertTrue(initiateInitiateInstallSnapshot);
382         }};
383     }
384
385     @Test
386     public void testInitiateInstallSnapshot() throws Exception {
387         new JavaTestKit(getSystem()) {{
388
389             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
390
391             ActorRef followerActor = getTestActor();
392
393             Map<String, String> peerAddresses = new HashMap<>();
394             peerAddresses.put(followerActor.path().toString(),
395                 followerActor.path().toString());
396
397
398             MockRaftActorContext actorContext =
399                 (MockRaftActorContext) createActorContext(leaderActor);
400             actorContext.setPeerAddresses(peerAddresses);
401
402             Map<String, String> leadersSnapshot = new HashMap<>();
403             leadersSnapshot.put("1", "A");
404             leadersSnapshot.put("2", "B");
405             leadersSnapshot.put("3", "C");
406
407             //clears leaders log
408             actorContext.getReplicatedLog().removeFrom(0);
409
410             final int followersLastIndex = 2;
411             final int snapshotIndex = 3;
412             final int newEntryIndex = 4;
413             final int snapshotTerm = 1;
414             final int currentTerm = 2;
415
416             // set the snapshot variables in replicatedlog
417             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
418             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
419             actorContext.setLastApplied(3);
420             actorContext.setCommitIndex(followersLastIndex);
421
422             Leader leader = new Leader(actorContext);
423             // set the snapshot as absent and check if capture-snapshot is invoked.
424             leader.setSnapshot(Optional.<ByteString>absent());
425
426             // new entry
427             ReplicatedLogImplEntry entry =
428                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
429                     new MockRaftActorContext.MockPayload("D"));
430
431             actorContext.getReplicatedLog().append(entry);
432
433             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
434             RaftActorBehavior raftBehavior = leader.handleMessage(
435                 leaderActor, new InitiateInstallSnapshot());
436
437             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
438                 getFirstMatching(leaderActor, CaptureSnapshot.class);
439
440             assertNotNull(cs);
441
442             assertTrue(cs.isInstallSnapshotInitiated());
443             assertEquals(3, cs.getLastAppliedIndex());
444             assertEquals(1, cs.getLastAppliedTerm());
445             assertEquals(4, cs.getLastIndex());
446             assertEquals(2, cs.getLastTerm());
447         }};
448     }
449
450     @Test
451     public void testInstallSnapshot() {
452         new JavaTestKit(getSystem()) {{
453
454             ActorRef followerActor = getTestActor();
455
456             Map<String, String> peerAddresses = new HashMap<>();
457             peerAddresses.put(followerActor.path().toString(),
458                 followerActor.path().toString());
459
460             MockRaftActorContext actorContext =
461                 (MockRaftActorContext) createActorContext();
462             actorContext.setPeerAddresses(peerAddresses);
463
464
465             Map<String, String> leadersSnapshot = new HashMap<>();
466             leadersSnapshot.put("1", "A");
467             leadersSnapshot.put("2", "B");
468             leadersSnapshot.put("3", "C");
469
470             //clears leaders log
471             actorContext.getReplicatedLog().removeFrom(0);
472
473             final int followersLastIndex = 2;
474             final int snapshotIndex = 3;
475             final int newEntryIndex = 4;
476             final int snapshotTerm = 1;
477             final int currentTerm = 2;
478
479             // set the snapshot variables in replicatedlog
480             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483             actorContext.setCommitIndex(followersLastIndex);
484
485             Leader leader = new Leader(actorContext);
486
487             // new entry
488             ReplicatedLogImplEntry entry =
489                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
490                     new MockRaftActorContext.MockPayload("D"));
491
492             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
493                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
494
495             assertTrue(raftBehavior instanceof Leader);
496
497             // check if installsnapshot gets called with the correct values.
498             final String out =
499                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
500                     // do not put code outside this method, will run afterwards
501                     @Override
502                     protected String match(Object in) {
503                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
504                             InstallSnapshot is = (InstallSnapshot)
505                                 SerializationUtils.fromSerializable(in);
506                             if (is.getData() == null) {
507                                 return "InstallSnapshot data is null";
508                             }
509                             if (is.getLastIncludedIndex() != snapshotIndex) {
510                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
511                             }
512                             if (is.getLastIncludedTerm() != snapshotTerm) {
513                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
514                             }
515                             if (is.getTerm() == currentTerm) {
516                                 return is.getTerm() + "!=" + currentTerm;
517                             }
518
519                             return "match";
520
521                         } else {
522                             return "message mismatch:" + in.getClass();
523                         }
524                     }
525                 }.get(); // this extracts the received message
526
527             assertEquals("match", out);
528         }};
529     }
530
531     @Test
532     public void testHandleInstallSnapshotReplyLastChunk() {
533         new JavaTestKit(getSystem()) {{
534
535             ActorRef followerActor = getTestActor();
536
537             Map<String, String> peerAddresses = new HashMap<>();
538             peerAddresses.put(followerActor.path().toString(),
539                 followerActor.path().toString());
540
541             final int followersLastIndex = 2;
542             final int snapshotIndex = 3;
543             final int newEntryIndex = 4;
544             final int snapshotTerm = 1;
545             final int currentTerm = 2;
546
547             MockRaftActorContext actorContext =
548                 (MockRaftActorContext) createActorContext();
549             actorContext.setPeerAddresses(peerAddresses);
550             actorContext.setCommitIndex(followersLastIndex);
551
552             MockLeader leader = new MockLeader(actorContext);
553
554             Map<String, String> leadersSnapshot = new HashMap<>();
555             leadersSnapshot.put("1", "A");
556             leadersSnapshot.put("2", "B");
557             leadersSnapshot.put("3", "C");
558
559             // set the snapshot variables in replicatedlog
560
561             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
564
565             ByteString bs = toByteString(leadersSnapshot);
566             leader.setSnapshot(Optional.of(bs));
567             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
568             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
569                 leader.getFollowerToSnapshot().getNextChunk();
570                 leader.getFollowerToSnapshot().incrementChunkIndex();
571             }
572
573             //clears leaders log
574             actorContext.getReplicatedLog().removeFrom(0);
575
576             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
577                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
578                     leader.getFollowerToSnapshot().getChunkIndex(), true));
579
580             assertTrue(raftBehavior instanceof Leader);
581
582             assertEquals(0, leader.followerSnapshotSize());
583             assertEquals(1, leader.followerLogSize());
584             assertNotNull(leader.getFollower(followerActor.path().toString()));
585             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
586             assertEquals(snapshotIndex, fli.getMatchIndex());
587             assertEquals(snapshotIndex, fli.getMatchIndex());
588             assertEquals(snapshotIndex + 1, fli.getNextIndex());
589         }};
590     }
591     @Test
592     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
593         new JavaTestKit(getSystem()) {{
594
595             TestActorRef<MessageCollectorActor> followerActor =
596                 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
597
598             Map<String, String> peerAddresses = new HashMap<>();
599             peerAddresses.put("follower-reply",
600                 followerActor.path().toString());
601
602             final int followersLastIndex = 2;
603             final int snapshotIndex = 3;
604             final int snapshotTerm = 1;
605             final int currentTerm = 2;
606
607             MockRaftActorContext actorContext =
608                 (MockRaftActorContext) createActorContext();
609             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
610                 @Override
611                 public int getSnapshotChunkSize() {
612                     return 50;
613                 }
614             };
615             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
616             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
617
618             actorContext.setConfigParams(configParams);
619             actorContext.setPeerAddresses(peerAddresses);
620             actorContext.setCommitIndex(followersLastIndex);
621
622             MockLeader leader = new MockLeader(actorContext);
623
624             Map<String, String> leadersSnapshot = new HashMap<>();
625             leadersSnapshot.put("1", "A");
626             leadersSnapshot.put("2", "B");
627             leadersSnapshot.put("3", "C");
628
629             // set the snapshot variables in replicatedlog
630             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
631             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
632             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
633
634             ByteString bs = toByteString(leadersSnapshot);
635             leader.setSnapshot(Optional.of(bs));
636
637             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
638
639             List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
640                 InstallSnapshotMessages.InstallSnapshot.class);
641
642             assertEquals(1, objectList.size());
643
644             Object o = objectList.get(0);
645             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
646
647             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
648
649             assertEquals(1, installSnapshot.getChunkIndex());
650             assertEquals(3, installSnapshot.getTotalChunks());
651
652             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
653                 "follower-reply", installSnapshot.getChunkIndex(), true));
654
655             objectList = MessageCollectorActor.getAllMatching(followerActor,
656                 InstallSnapshotMessages.InstallSnapshot.class);
657
658             assertEquals(2, objectList.size());
659
660             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
661
662             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
663                 "follower-reply", installSnapshot.getChunkIndex(), true));
664
665             objectList = MessageCollectorActor.getAllMatching(followerActor,
666                 InstallSnapshotMessages.InstallSnapshot.class);
667
668             assertEquals(3, objectList.size());
669
670             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
671
672             // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
673             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
674                 "follower-reply", installSnapshot.getChunkIndex(), true));
675
676             objectList = MessageCollectorActor.getAllMatching(followerActor,
677                 InstallSnapshotMessages.InstallSnapshot.class);
678
679             // Count should still stay at 3
680             assertEquals(3, objectList.size());
681         }};
682     }
683
684
685     @Test
686     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
687         new JavaTestKit(getSystem()) {{
688
689             TestActorRef<MessageCollectorActor> followerActor =
690                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
691
692             Map<String, String> peerAddresses = new HashMap<>();
693             peerAddresses.put(followerActor.path().toString(),
694                     followerActor.path().toString());
695
696             final int followersLastIndex = 2;
697             final int snapshotIndex = 3;
698             final int snapshotTerm = 1;
699             final int currentTerm = 2;
700
701             MockRaftActorContext actorContext =
702                     (MockRaftActorContext) createActorContext();
703
704             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
705                 @Override
706                 public int getSnapshotChunkSize() {
707                     return 50;
708                 }
709             });
710             actorContext.setPeerAddresses(peerAddresses);
711             actorContext.setCommitIndex(followersLastIndex);
712
713             MockLeader leader = new MockLeader(actorContext);
714
715             Map<String, String> leadersSnapshot = new HashMap<>();
716             leadersSnapshot.put("1", "A");
717             leadersSnapshot.put("2", "B");
718             leadersSnapshot.put("3", "C");
719
720             // set the snapshot variables in replicatedlog
721             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
722             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
723             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
724
725             ByteString bs = toByteString(leadersSnapshot);
726             leader.setSnapshot(Optional.of(bs));
727
728             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
729
730             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
731
732             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
733
734             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
735
736             assertEquals(1, installSnapshot.getChunkIndex());
737             assertEquals(3, installSnapshot.getTotalChunks());
738
739
740             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
741                 followerActor.path().toString(), -1, false));
742
743             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
744                 TimeUnit.MILLISECONDS);
745
746             leader.handleMessage(leaderActor, new SendHeartBeat());
747
748             o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
749
750             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
751
752             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
753
754             assertEquals(1, installSnapshot.getChunkIndex());
755             assertEquals(3, installSnapshot.getTotalChunks());
756
757             followerActor.tell(PoisonPill.getInstance(), getRef());
758         }};
759     }
760
761     @Test
762     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
763         new JavaTestKit(getSystem()) {
764             {
765
766                 TestActorRef<MessageCollectorActor> followerActor =
767                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
768
769                 Map<String, String> peerAddresses = new HashMap<>();
770                 peerAddresses.put(followerActor.path().toString(),
771                         followerActor.path().toString());
772
773                 final int followersLastIndex = 2;
774                 final int snapshotIndex = 3;
775                 final int snapshotTerm = 1;
776                 final int currentTerm = 2;
777
778                 MockRaftActorContext actorContext =
779                         (MockRaftActorContext) createActorContext();
780
781                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
782                     @Override
783                     public int getSnapshotChunkSize() {
784                         return 50;
785                     }
786                 });
787                 actorContext.setPeerAddresses(peerAddresses);
788                 actorContext.setCommitIndex(followersLastIndex);
789
790                 MockLeader leader = new MockLeader(actorContext);
791
792                 Map<String, String> leadersSnapshot = new HashMap<>();
793                 leadersSnapshot.put("1", "A");
794                 leadersSnapshot.put("2", "B");
795                 leadersSnapshot.put("3", "C");
796
797                 // set the snapshot variables in replicatedlog
798                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
799                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
800                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
801
802                 ByteString bs = toByteString(leadersSnapshot);
803                 leader.setSnapshot(Optional.of(bs));
804
805                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
806
807                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
808
809                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
810
811                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
812
813                 assertEquals(1, installSnapshot.getChunkIndex());
814                 assertEquals(3, installSnapshot.getTotalChunks());
815                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
816
817                 int hashCode = installSnapshot.getData().hashCode();
818
819                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
820
821                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
822
823                 leader.handleMessage(leaderActor, new SendHeartBeat());
824
825                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
826
827                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
828
829                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
830
831                 assertEquals(2, installSnapshot.getChunkIndex());
832                 assertEquals(3, installSnapshot.getTotalChunks());
833                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
834
835                 followerActor.tell(PoisonPill.getInstance(), getRef());
836             }};
837     }
838
839     @Test
840     public void testFollowerToSnapshotLogic() {
841
842         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
843
844         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
845             @Override
846             public int getSnapshotChunkSize() {
847                 return 50;
848             }
849         });
850
851         MockLeader leader = new MockLeader(actorContext);
852
853         Map<String, String> leadersSnapshot = new HashMap<>();
854         leadersSnapshot.put("1", "A");
855         leadersSnapshot.put("2", "B");
856         leadersSnapshot.put("3", "C");
857
858         ByteString bs = toByteString(leadersSnapshot);
859         byte[] barray = bs.toByteArray();
860
861         leader.createFollowerToSnapshot("followerId", bs);
862         assertEquals(bs.size(), barray.length);
863
864         int chunkIndex=0;
865         for (int i=0; i < barray.length; i = i + 50) {
866             int j = i + 50;
867             chunkIndex++;
868
869             if (i + 50 > barray.length) {
870                 j = barray.length;
871             }
872
873             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
874             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
875             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
876
877             leader.getFollowerToSnapshot().markSendStatus(true);
878             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
879                 leader.getFollowerToSnapshot().incrementChunkIndex();
880             }
881         }
882
883         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
884     }
885
886
887     @Override protected RaftActorBehavior createBehavior(
888         RaftActorContext actorContext) {
889         return new Leader(actorContext);
890     }
891
892     @Override protected RaftActorContext createActorContext() {
893         return createActorContext(leaderActor);
894     }
895
896     @Override
897     protected RaftActorContext createActorContext(ActorRef actorRef) {
898         return new MockRaftActorContext("test", getSystem(), actorRef);
899     }
900
901     private ByteString toByteString(Map<String, String> state) {
902         ByteArrayOutputStream b = null;
903         ObjectOutputStream o = null;
904         try {
905             try {
906                 b = new ByteArrayOutputStream();
907                 o = new ObjectOutputStream(b);
908                 o.writeObject(state);
909                 byte[] snapshotBytes = b.toByteArray();
910                 return ByteString.copyFrom(snapshotBytes);
911             } finally {
912                 if (o != null) {
913                     o.flush();
914                     o.close();
915                 }
916                 if (b != null) {
917                     b.close();
918                 }
919             }
920         } catch (IOException e) {
921             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
922         }
923         return null;
924     }
925
926     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
927         private static AbstractRaftActorBehavior behavior;
928
929         public ForwardMessageToBehaviorActor(){
930
931         }
932
933         @Override public void onReceive(Object message) throws Exception {
934             super.onReceive(message);
935             behavior.handleMessage(sender(), message);
936         }
937
938         public static void setBehavior(AbstractRaftActorBehavior behavior){
939             ForwardMessageToBehaviorActor.behavior = behavior;
940         }
941     }
942
943     @Test
944     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
945         new JavaTestKit(getSystem()) {{
946
947             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
948
949             MockRaftActorContext leaderActorContext =
950                 new MockRaftActorContext("leader", getSystem(), leaderActor);
951
952             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
953
954             MockRaftActorContext followerActorContext =
955                 new MockRaftActorContext("follower", getSystem(), followerActor);
956
957             Follower follower = new Follower(followerActorContext);
958
959             ForwardMessageToBehaviorActor.setBehavior(follower);
960
961             Map<String, String> peerAddresses = new HashMap<>();
962             peerAddresses.put(followerActor.path().toString(),
963                 followerActor.path().toString());
964
965             leaderActorContext.setPeerAddresses(peerAddresses);
966
967             leaderActorContext.getReplicatedLog().removeFrom(0);
968
969             //create 3 entries
970             leaderActorContext.setReplicatedLog(
971                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
972
973             leaderActorContext.setCommitIndex(1);
974
975             followerActorContext.getReplicatedLog().removeFrom(0);
976
977             // follower too has the exact same log entries and has the same commit index
978             followerActorContext.setReplicatedLog(
979                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
980
981             followerActorContext.setCommitIndex(1);
982
983             Leader leader = new Leader(leaderActorContext);
984             leader.markFollowerActive(followerActor.path().toString());
985
986             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
987                 TimeUnit.MILLISECONDS);
988
989             leader.handleMessage(leaderActor, new SendHeartBeat());
990
991             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
992                     .getFirstMatching(followerActor, AppendEntries.class);
993
994             assertNotNull(appendEntries);
995
996             assertEquals(1, appendEntries.getLeaderCommit());
997             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
998             assertEquals(0, appendEntries.getPrevLogIndex());
999
1000             AppendEntriesReply appendEntriesReply =
1001                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1002                     leaderActor, AppendEntriesReply.class);
1003
1004             assertNotNull(appendEntriesReply);
1005
1006             // follower returns its next index
1007             assertEquals(2, appendEntriesReply.getLogLastIndex());
1008             assertEquals(1, appendEntriesReply.getLogLastTerm());
1009
1010         }};
1011     }
1012
1013
1014     @Test
1015     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1016         new JavaTestKit(getSystem()) {{
1017
1018             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1019
1020             MockRaftActorContext leaderActorContext =
1021                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1022
1023             ActorRef followerActor = getSystem().actorOf(
1024                 Props.create(ForwardMessageToBehaviorActor.class));
1025
1026             MockRaftActorContext followerActorContext =
1027                 new MockRaftActorContext("follower", getSystem(), followerActor);
1028
1029             Follower follower = new Follower(followerActorContext);
1030
1031             ForwardMessageToBehaviorActor.setBehavior(follower);
1032
1033             Map<String, String> peerAddresses = new HashMap<>();
1034             peerAddresses.put(followerActor.path().toString(),
1035                 followerActor.path().toString());
1036
1037             leaderActorContext.setPeerAddresses(peerAddresses);
1038
1039             leaderActorContext.getReplicatedLog().removeFrom(0);
1040
1041             leaderActorContext.setReplicatedLog(
1042                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1043
1044             leaderActorContext.setCommitIndex(1);
1045
1046             followerActorContext.getReplicatedLog().removeFrom(0);
1047
1048             followerActorContext.setReplicatedLog(
1049                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1050
1051             // follower has the same log entries but its commit index > leaders commit index
1052             followerActorContext.setCommitIndex(2);
1053
1054             Leader leader = new Leader(leaderActorContext);
1055             leader.markFollowerActive(followerActor.path().toString());
1056
1057             Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
1058
1059             leader.handleMessage(leaderActor, new SendHeartBeat());
1060
1061             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
1062                     .getFirstMatching(followerActor, AppendEntries.class);
1063
1064             assertNotNull(appendEntries);
1065
1066             assertEquals(1, appendEntries.getLeaderCommit());
1067             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1068             assertEquals(0, appendEntries.getPrevLogIndex());
1069
1070             AppendEntriesReply appendEntriesReply =
1071                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1072                     leaderActor, AppendEntriesReply.class);
1073
1074             assertNotNull(appendEntriesReply);
1075
1076             assertEquals(2, appendEntriesReply.getLogLastIndex());
1077             assertEquals(1, appendEntriesReply.getLogLastTerm());
1078
1079         }};
1080     }
1081
1082     @Test
1083     public void testHandleAppendEntriesReplyFailure(){
1084         new JavaTestKit(getSystem()) {
1085             {
1086
1087                 ActorRef leaderActor =
1088                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1089
1090                 ActorRef followerActor =
1091                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1092
1093
1094                 MockRaftActorContext leaderActorContext =
1095                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1096
1097                 Map<String, String> peerAddresses = new HashMap<>();
1098                 peerAddresses.put("follower-1",
1099                     followerActor.path().toString());
1100
1101                 leaderActorContext.setPeerAddresses(peerAddresses);
1102
1103                 Leader leader = new Leader(leaderActorContext);
1104
1105                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1106
1107                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1108
1109                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1110
1111             }};
1112     }
1113
1114     @Test
1115     public void testHandleAppendEntriesReplySuccess() throws Exception {
1116         new JavaTestKit(getSystem()) {
1117             {
1118
1119                 ActorRef leaderActor =
1120                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1121
1122                 ActorRef followerActor =
1123                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1124
1125
1126                 MockRaftActorContext leaderActorContext =
1127                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1128
1129                 leaderActorContext.setReplicatedLog(
1130                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1131
1132                 Map<String, String> peerAddresses = new HashMap<>();
1133                 peerAddresses.put("follower-1",
1134                     followerActor.path().toString());
1135
1136                 leaderActorContext.setPeerAddresses(peerAddresses);
1137                 leaderActorContext.setCommitIndex(1);
1138                 leaderActorContext.setLastApplied(1);
1139                 leaderActorContext.getTermInformation().update(1, "leader");
1140
1141                 Leader leader = new Leader(leaderActorContext);
1142
1143                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1144
1145                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1146
1147                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1148
1149                 assertEquals(2, leaderActorContext.getCommitIndex());
1150
1151                 ApplyLogEntries applyLogEntries =
1152                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1153                         ApplyLogEntries.class);
1154
1155                 assertNotNull(applyLogEntries);
1156
1157                 assertEquals(2, leaderActorContext.getLastApplied());
1158
1159                 assertEquals(2, applyLogEntries.getToIndex());
1160
1161                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1162                     ApplyState.class);
1163
1164                 assertEquals(1,applyStateList.size());
1165
1166                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1167
1168                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1169
1170             }};
1171     }
1172
1173     @Test
1174     public void testHandleAppendEntriesReplyUnknownFollower(){
1175         new JavaTestKit(getSystem()) {
1176             {
1177
1178                 ActorRef leaderActor =
1179                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1180
1181                 MockRaftActorContext leaderActorContext =
1182                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1183
1184                 Leader leader = new Leader(leaderActorContext);
1185
1186                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1187
1188                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1189
1190                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1191
1192             }};
1193     }
1194
1195     @Test
1196     public void testHandleRequestVoteReply(){
1197         new JavaTestKit(getSystem()) {
1198             {
1199
1200                 ActorRef leaderActor =
1201                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1202
1203                 MockRaftActorContext leaderActorContext =
1204                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1205
1206                 Leader leader = new Leader(leaderActorContext);
1207
1208                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1209
1210                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1211
1212                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1213
1214                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1215             }};
1216     }
1217
1218     @Test
1219     public void testIsolatedLeaderCheckNoFollowers() {
1220         new JavaTestKit(getSystem()) {{
1221             ActorRef leaderActor = getTestActor();
1222
1223             MockRaftActorContext leaderActorContext =
1224                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1225
1226             Map<String, String> peerAddresses = new HashMap<>();
1227             leaderActorContext.setPeerAddresses(peerAddresses);
1228
1229             Leader leader = new Leader(leaderActorContext);
1230             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1231             Assert.assertTrue(behavior instanceof Leader);
1232         }};
1233     }
1234
1235     @Test
1236     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1237         new JavaTestKit(getSystem()) {{
1238
1239             ActorRef followerActor1 = getTestActor();
1240             ActorRef followerActor2 = getTestActor();
1241
1242             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1243
1244             Map<String, String> peerAddresses = new HashMap<>();
1245             peerAddresses.put("follower-1", followerActor1.path().toString());
1246             peerAddresses.put("follower-2", followerActor2.path().toString());
1247
1248             leaderActorContext.setPeerAddresses(peerAddresses);
1249
1250             Leader leader = new Leader(leaderActorContext);
1251             leader.stopIsolatedLeaderCheckSchedule();
1252
1253             leader.markFollowerActive("follower-1");
1254             leader.markFollowerActive("follower-2");
1255             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1256             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1257                 behavior instanceof Leader);
1258
1259             // kill 1 follower and verify if that got killed
1260             final JavaTestKit probe = new JavaTestKit(getSystem());
1261             probe.watch(followerActor1);
1262             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1263             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1264             assertEquals(termMsg1.getActor(), followerActor1);
1265
1266             leader.markFollowerInActive("follower-1");
1267             leader.markFollowerActive("follower-2");
1268             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1269             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1270                 behavior instanceof Leader);
1271
1272             // kill 2nd follower and leader should change to Isolated leader
1273             followerActor2.tell(PoisonPill.getInstance(), null);
1274             probe.watch(followerActor2);
1275             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1276             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1277             assertEquals(termMsg2.getActor(), followerActor2);
1278
1279             leader.markFollowerInActive("follower-2");
1280             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1281             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1282                 behavior instanceof IsolatedLeader);
1283
1284         }};
1285     }
1286
1287
1288     @Test
1289     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1290         new JavaTestKit(getSystem()) {{
1291
1292             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1293
1294             MockRaftActorContext leaderActorContext =
1295                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1296
1297             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1298             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1299             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1300
1301             leaderActorContext.setConfigParams(configParams);
1302
1303             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1304
1305             MockRaftActorContext followerActorContext =
1306                 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1307
1308             followerActorContext.setConfigParams(configParams);
1309
1310             Follower follower = new Follower(followerActorContext);
1311
1312             ForwardMessageToBehaviorActor.setBehavior(follower);
1313
1314             Map<String, String> peerAddresses = new HashMap<>();
1315             peerAddresses.put("follower-reply",
1316                 followerActor.path().toString());
1317
1318             leaderActorContext.setPeerAddresses(peerAddresses);
1319
1320             leaderActorContext.getReplicatedLog().removeFrom(0);
1321
1322             //create 3 entries
1323             leaderActorContext.setReplicatedLog(
1324                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1325
1326             leaderActorContext.setCommitIndex(1);
1327
1328             Leader leader = new Leader(leaderActorContext);
1329             leader.markFollowerActive("follower-reply");
1330
1331             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1332                 TimeUnit.MILLISECONDS);
1333
1334             leader.handleMessage(leaderActor, new SendHeartBeat());
1335
1336             AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1337                 .getFirstMatching(followerActor, AppendEntries.class);
1338
1339             assertNotNull(appendEntries);
1340
1341             assertEquals(1, appendEntries.getLeaderCommit());
1342             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1343             assertEquals(0, appendEntries.getPrevLogIndex());
1344
1345             AppendEntriesReply appendEntriesReply =
1346                 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1347
1348             assertNotNull(appendEntriesReply);
1349
1350             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1351
1352             List<Object> entries = ForwardMessageToBehaviorActor
1353                 .getAllMatching(followerActor, AppendEntries.class);
1354
1355             assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1356
1357             AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1358
1359             assertEquals(1, appendEntriesSecond.getLeaderCommit());
1360             assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1361             assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1362
1363         }};
1364     }
1365
1366     class MockLeader extends Leader {
1367
1368         FollowerToSnapshot fts;
1369
1370         public MockLeader(RaftActorContext context){
1371             super(context);
1372         }
1373
1374         public FollowerToSnapshot getFollowerToSnapshot() {
1375             return fts;
1376         }
1377
1378         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1379             fts = new FollowerToSnapshot(bs);
1380             setFollowerSnapshot(followerId, fts);
1381         }
1382     }
1383
1384     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1385
1386         private final long electionTimeOutIntervalMillis;
1387         private final int snapshotChunkSize;
1388
1389         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1390             super();
1391             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1392             this.snapshotChunkSize = snapshotChunkSize;
1393         }
1394
1395         @Override
1396         public FiniteDuration getElectionTimeOutInterval() {
1397             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1398         }
1399
1400         @Override
1401         public int getSnapshotChunkSize() {
1402             return snapshotChunkSize;
1403         }
1404     }
1405 }