Merge "BUG-2599 Netconf notification manager initial API and Impl"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import com.google.protobuf.ByteString;
12 import java.io.ByteArrayOutputStream;
13 import java.io.IOException;
14 import java.io.ObjectOutputStream;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Assert;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.RaftState;
26 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
27 import org.opendaylight.controller.cluster.raft.SerializationUtils;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
33 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
37 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
41 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotNull;
48 import static org.junit.Assert.assertTrue;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     private final ActorRef leaderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54     private final ActorRef senderActor =
55         getSystem().actorOf(Props.create(DoNothingActor.class));
56
57     @Test
58     public void testHandleMessageForUnknownMessage() throws Exception {
59         new JavaTestKit(getSystem()) {{
60             Leader leader =
61                 new Leader(createActorContext());
62
63             // handle message should return the Leader state when it receives an
64             // unknown message
65             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66             Assert.assertTrue(behavior instanceof Leader);
67         }};
68     }
69
70     @Test
71     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72         new JavaTestKit(getSystem()) {{
73
74             new Within(duration("1 seconds")) {
75                 @Override
76                 protected void run() {
77
78                     ActorRef followerActor = getTestActor();
79
80                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81
82                     Map<String, String> peerAddresses = new HashMap<>();
83
84                     peerAddresses.put(followerActor.path().toString(),
85                         followerActor.path().toString());
86
87                     actorContext.setPeerAddresses(peerAddresses);
88
89                     Leader leader = new Leader(actorContext);
90                     leader.markFollowerActive(followerActor.path().toString());
91                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
92                         TimeUnit.MILLISECONDS);
93                     leader.handleMessage(senderActor, new SendHeartBeat());
94
95                     final String out =
96                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
97                             // do not put code outside this method, will run afterwards
98                             @Override
99                             protected String match(Object in) {
100                                 Object msg = fromSerializableMessage(in);
101                                 if (msg instanceof AppendEntries) {
102                                     if (((AppendEntries)msg).getTerm() == 0) {
103                                         return "match";
104                                     }
105                                     return null;
106                                 } else {
107                                     throw noMatch();
108                                 }
109                             }
110                         }.get(); // this extracts the received message
111
112                     assertEquals("match", out);
113
114                 }
115             };
116         }};
117     }
118
119     @Test
120     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
121         new JavaTestKit(getSystem()) {{
122
123             new Within(duration("1 seconds")) {
124                 @Override
125                 protected void run() {
126
127                     ActorRef followerActor = getTestActor();
128
129                     MockRaftActorContext actorContext =
130                         (MockRaftActorContext) createActorContext();
131
132                     Map<String, String> peerAddresses = new HashMap<>();
133
134                     peerAddresses.put(followerActor.path().toString(),
135                             followerActor.path().toString());
136
137                     actorContext.setPeerAddresses(peerAddresses);
138
139                     Leader leader = new Leader(actorContext);
140                     leader.markFollowerActive(followerActor.path().toString());
141                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
142                         TimeUnit.MILLISECONDS);
143                     RaftActorBehavior raftBehavior = leader
144                         .handleMessage(senderActor, new Replicate(null, null,
145                             new MockRaftActorContext.MockReplicatedLogEntry(1,
146                                 100,
147                                 new MockRaftActorContext.MockPayload("foo"))
148                         ));
149
150                     // State should not change
151                     assertTrue(raftBehavior instanceof Leader);
152
153                     final String out =
154                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
155                             // do not put code outside this method, will run afterwards
156                             @Override
157                             protected String match(Object in) {
158                                 Object msg = fromSerializableMessage(in);
159                                 if (msg instanceof AppendEntries) {
160                                     if (((AppendEntries)msg).getTerm() == 0) {
161                                         return "match";
162                                     }
163                                     return null;
164                                 } else {
165                                     throw noMatch();
166                                 }
167                             }
168                         }.get(); // this extracts the received message
169
170                     assertEquals("match", out);
171                 }
172             };
173         }};
174     }
175
176     @Test
177     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
178         new JavaTestKit(getSystem()) {{
179
180             new Within(duration("1 seconds")) {
181                 @Override
182                 protected void run() {
183
184                     ActorRef raftActor = getTestActor();
185
186                     MockRaftActorContext actorContext =
187                         new MockRaftActorContext("test", getSystem(), raftActor);
188
189                     actorContext.getReplicatedLog().removeFrom(0);
190
191                     actorContext.setReplicatedLog(
192                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
193                             .build());
194
195                     Leader leader = new Leader(actorContext);
196                     RaftActorBehavior raftBehavior = leader
197                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
198
199                     // State should not change
200                     assertTrue(raftBehavior instanceof Leader);
201
202                     assertEquals(1, actorContext.getCommitIndex());
203
204                     final String out =
205                         new ExpectMsg<String>(duration("1 seconds"),
206                             "match hint") {
207                             // do not put code outside this method, will run afterwards
208                             @Override
209                             protected String match(Object in) {
210                                 if (in instanceof ApplyState) {
211                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
212                                         return "match";
213                                     }
214                                     return null;
215                                 } else {
216                                     throw noMatch();
217                                 }
218                             }
219                         }.get(); // this extracts the received message
220
221                     assertEquals("match", out);
222
223                 }
224             };
225         }};
226     }
227
228     @Test
229     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
230         new JavaTestKit(getSystem()) {{
231             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
232
233             Map<String, String> peerAddresses = new HashMap<>();
234             peerAddresses.put(followerActor.path().toString(),
235                 followerActor.path().toString());
236
237             MockRaftActorContext actorContext =
238                 (MockRaftActorContext) createActorContext(leaderActor);
239             actorContext.setPeerAddresses(peerAddresses);
240
241             Map<String, String> leadersSnapshot = new HashMap<>();
242             leadersSnapshot.put("1", "A");
243             leadersSnapshot.put("2", "B");
244             leadersSnapshot.put("3", "C");
245
246             //clears leaders log
247             actorContext.getReplicatedLog().removeFrom(0);
248
249             final int followersLastIndex = 2;
250             final int snapshotIndex = 3;
251             final int newEntryIndex = 4;
252             final int snapshotTerm = 1;
253             final int currentTerm = 2;
254
255             // set the snapshot variables in replicatedlog
256             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
257             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
258             actorContext.setCommitIndex(followersLastIndex);
259             //set follower timeout to 2 mins, helps during debugging
260             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
261
262             MockLeader leader = new MockLeader(actorContext);
263
264             // new entry
265             ReplicatedLogImplEntry entry =
266                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
267                     new MockRaftActorContext.MockPayload("D"));
268
269             //update follower timestamp
270             leader.markFollowerActive(followerActor.path().toString());
271
272             ByteString bs = toByteString(leadersSnapshot);
273             leader.setSnapshot(Optional.of(bs));
274             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
275
276             //send first chunk and no InstallSnapshotReply received yet
277             leader.getFollowerToSnapshot().getNextChunk();
278             leader.getFollowerToSnapshot().incrementChunkIndex();
279
280             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
281                 TimeUnit.MILLISECONDS);
282
283             leader.handleMessage(leaderActor, new SendHeartBeat());
284
285             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
286                 followerActor, AppendEntries.class);
287
288             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
289                 "received", aeproto);
290
291             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
292
293             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
294
295             //InstallSnapshotReply received
296             leader.getFollowerToSnapshot().markSendStatus(true);
297
298             leader.handleMessage(senderActor, new SendHeartBeat());
299
300             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
301                 MessageCollectorActor.getFirstMatching(followerActor,
302                     InstallSnapshot.SERIALIZABLE_CLASS);
303
304             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
305                 isproto);
306
307             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
308
309             assertEquals(snapshotIndex, is.getLastIncludedIndex());
310
311         }};
312     }
313
314     @Test
315     public void testSendAppendEntriesSnapshotScenario() {
316         new JavaTestKit(getSystem()) {{
317
318             ActorRef followerActor = getTestActor();
319
320             Map<String, String> peerAddresses = new HashMap<>();
321             peerAddresses.put(followerActor.path().toString(),
322                 followerActor.path().toString());
323
324             MockRaftActorContext actorContext =
325                 (MockRaftActorContext) createActorContext(getRef());
326             actorContext.setPeerAddresses(peerAddresses);
327
328             Map<String, String> leadersSnapshot = new HashMap<>();
329             leadersSnapshot.put("1", "A");
330             leadersSnapshot.put("2", "B");
331             leadersSnapshot.put("3", "C");
332
333             //clears leaders log
334             actorContext.getReplicatedLog().removeFrom(0);
335
336             final int followersLastIndex = 2;
337             final int snapshotIndex = 3;
338             final int newEntryIndex = 4;
339             final int snapshotTerm = 1;
340             final int currentTerm = 2;
341
342             // set the snapshot variables in replicatedlog
343             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
344             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
345             actorContext.setCommitIndex(followersLastIndex);
346
347             Leader leader = new Leader(actorContext);
348
349             // new entry
350             ReplicatedLogImplEntry entry =
351                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
352                     new MockRaftActorContext.MockPayload("D"));
353
354             //update follower timestamp
355             leader.markFollowerActive(followerActor.path().toString());
356
357             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
358                 TimeUnit.MILLISECONDS);
359
360             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
361             RaftActorBehavior raftBehavior = leader.handleMessage(
362                 senderActor, new Replicate(null, "state-id", entry));
363
364             assertTrue(raftBehavior instanceof Leader);
365
366             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
367             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
368                 @Override
369                 protected Boolean match(Object o) throws Exception {
370                     if (o instanceof InitiateInstallSnapshot) {
371                         return true;
372                     }
373                     return false;
374                 }
375             }.get();
376
377             boolean initiateInitiateInstallSnapshot = false;
378             for (Boolean b: matches) {
379                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
380             }
381
382             assertTrue(initiateInitiateInstallSnapshot);
383         }};
384     }
385
386     @Test
387     public void testInitiateInstallSnapshot() throws Exception {
388         new JavaTestKit(getSystem()) {{
389
390             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
391
392             ActorRef followerActor = getTestActor();
393
394             Map<String, String> peerAddresses = new HashMap<>();
395             peerAddresses.put(followerActor.path().toString(),
396                 followerActor.path().toString());
397
398
399             MockRaftActorContext actorContext =
400                 (MockRaftActorContext) createActorContext(leaderActor);
401             actorContext.setPeerAddresses(peerAddresses);
402
403             Map<String, String> leadersSnapshot = new HashMap<>();
404             leadersSnapshot.put("1", "A");
405             leadersSnapshot.put("2", "B");
406             leadersSnapshot.put("3", "C");
407
408             //clears leaders log
409             actorContext.getReplicatedLog().removeFrom(0);
410
411             final int followersLastIndex = 2;
412             final int snapshotIndex = 3;
413             final int newEntryIndex = 4;
414             final int snapshotTerm = 1;
415             final int currentTerm = 2;
416
417             // set the snapshot variables in replicatedlog
418             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
419             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
420             actorContext.setLastApplied(3);
421             actorContext.setCommitIndex(followersLastIndex);
422
423             Leader leader = new Leader(actorContext);
424             // set the snapshot as absent and check if capture-snapshot is invoked.
425             leader.setSnapshot(Optional.<ByteString>absent());
426
427             // new entry
428             ReplicatedLogImplEntry entry =
429                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
430                     new MockRaftActorContext.MockPayload("D"));
431
432             actorContext.getReplicatedLog().append(entry);
433
434             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
435             RaftActorBehavior raftBehavior = leader.handleMessage(
436                 leaderActor, new InitiateInstallSnapshot());
437
438             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
439                 getFirstMatching(leaderActor, CaptureSnapshot.class);
440
441             assertNotNull(cs);
442
443             assertTrue(cs.isInstallSnapshotInitiated());
444             assertEquals(3, cs.getLastAppliedIndex());
445             assertEquals(1, cs.getLastAppliedTerm());
446             assertEquals(4, cs.getLastIndex());
447             assertEquals(2, cs.getLastTerm());
448
449             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
450             raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
451             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
452             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
453
454         }};
455     }
456
457     @Test
458     public void testInstallSnapshot() {
459         new JavaTestKit(getSystem()) {{
460
461             ActorRef followerActor = getTestActor();
462
463             Map<String, String> peerAddresses = new HashMap<>();
464             peerAddresses.put(followerActor.path().toString(),
465                 followerActor.path().toString());
466
467             MockRaftActorContext actorContext =
468                 (MockRaftActorContext) createActorContext();
469             actorContext.setPeerAddresses(peerAddresses);
470
471
472             Map<String, String> leadersSnapshot = new HashMap<>();
473             leadersSnapshot.put("1", "A");
474             leadersSnapshot.put("2", "B");
475             leadersSnapshot.put("3", "C");
476
477             //clears leaders log
478             actorContext.getReplicatedLog().removeFrom(0);
479
480             final int followersLastIndex = 2;
481             final int snapshotIndex = 3;
482             final int newEntryIndex = 4;
483             final int snapshotTerm = 1;
484             final int currentTerm = 2;
485
486             // set the snapshot variables in replicatedlog
487             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
488             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
489             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
490             actorContext.setCommitIndex(followersLastIndex);
491
492             Leader leader = new Leader(actorContext);
493
494             // new entry
495             ReplicatedLogImplEntry entry =
496                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
497                     new MockRaftActorContext.MockPayload("D"));
498
499             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
500                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
501
502             assertTrue(raftBehavior instanceof Leader);
503
504             // check if installsnapshot gets called with the correct values.
505             final String out =
506                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
507                     // do not put code outside this method, will run afterwards
508                     @Override
509                     protected String match(Object in) {
510                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
511                             InstallSnapshot is = (InstallSnapshot)
512                                 SerializationUtils.fromSerializable(in);
513                             if (is.getData() == null) {
514                                 return "InstallSnapshot data is null";
515                             }
516                             if (is.getLastIncludedIndex() != snapshotIndex) {
517                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
518                             }
519                             if (is.getLastIncludedTerm() != snapshotTerm) {
520                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
521                             }
522                             if (is.getTerm() == currentTerm) {
523                                 return is.getTerm() + "!=" + currentTerm;
524                             }
525
526                             return "match";
527
528                         } else {
529                             return "message mismatch:" + in.getClass();
530                         }
531                     }
532                 }.get(); // this extracts the received message
533
534             assertEquals("match", out);
535         }};
536     }
537
538     @Test
539     public void testHandleInstallSnapshotReplyLastChunk() {
540         new JavaTestKit(getSystem()) {{
541
542             ActorRef followerActor = getTestActor();
543
544             Map<String, String> peerAddresses = new HashMap<>();
545             peerAddresses.put(followerActor.path().toString(),
546                 followerActor.path().toString());
547
548             final int followersLastIndex = 2;
549             final int snapshotIndex = 3;
550             final int newEntryIndex = 4;
551             final int snapshotTerm = 1;
552             final int currentTerm = 2;
553
554             MockRaftActorContext actorContext =
555                 (MockRaftActorContext) createActorContext();
556             actorContext.setPeerAddresses(peerAddresses);
557             actorContext.setCommitIndex(followersLastIndex);
558
559             MockLeader leader = new MockLeader(actorContext);
560
561             Map<String, String> leadersSnapshot = new HashMap<>();
562             leadersSnapshot.put("1", "A");
563             leadersSnapshot.put("2", "B");
564             leadersSnapshot.put("3", "C");
565
566             // set the snapshot variables in replicatedlog
567
568             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
571
572             ByteString bs = toByteString(leadersSnapshot);
573             leader.setSnapshot(Optional.of(bs));
574             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
575             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
576                 leader.getFollowerToSnapshot().getNextChunk();
577                 leader.getFollowerToSnapshot().incrementChunkIndex();
578             }
579
580             //clears leaders log
581             actorContext.getReplicatedLog().removeFrom(0);
582
583             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
584                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
585                     leader.getFollowerToSnapshot().getChunkIndex(), true));
586
587             assertTrue(raftBehavior instanceof Leader);
588
589             assertEquals(0, leader.followerSnapshotSize());
590             assertEquals(1, leader.followerLogSize());
591             assertNotNull(leader.getFollower(followerActor.path().toString()));
592             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
593             assertEquals(snapshotIndex, fli.getMatchIndex());
594             assertEquals(snapshotIndex, fli.getMatchIndex());
595             assertEquals(snapshotIndex + 1, fli.getNextIndex());
596         }};
597     }
598     @Test
599     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
600         new JavaTestKit(getSystem()) {{
601
602             TestActorRef<MessageCollectorActor> followerActor =
603                 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
604
605             Map<String, String> peerAddresses = new HashMap<>();
606             peerAddresses.put("follower-reply",
607                 followerActor.path().toString());
608
609             final int followersLastIndex = 2;
610             final int snapshotIndex = 3;
611             final int snapshotTerm = 1;
612             final int currentTerm = 2;
613
614             MockRaftActorContext actorContext =
615                 (MockRaftActorContext) createActorContext();
616             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
617                 @Override
618                 public int getSnapshotChunkSize() {
619                     return 50;
620                 }
621             };
622             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
623             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
624
625             actorContext.setConfigParams(configParams);
626             actorContext.setPeerAddresses(peerAddresses);
627             actorContext.setCommitIndex(followersLastIndex);
628
629             MockLeader leader = new MockLeader(actorContext);
630
631             Map<String, String> leadersSnapshot = new HashMap<>();
632             leadersSnapshot.put("1", "A");
633             leadersSnapshot.put("2", "B");
634             leadersSnapshot.put("3", "C");
635
636             // set the snapshot variables in replicatedlog
637             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
638             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
639             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
640
641             ByteString bs = toByteString(leadersSnapshot);
642             leader.setSnapshot(Optional.of(bs));
643
644             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
645
646             List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
647                 InstallSnapshotMessages.InstallSnapshot.class);
648
649             assertEquals(1, objectList.size());
650
651             Object o = objectList.get(0);
652             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
653
654             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
655
656             assertEquals(1, installSnapshot.getChunkIndex());
657             assertEquals(3, installSnapshot.getTotalChunks());
658
659             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
660                 "follower-reply", installSnapshot.getChunkIndex(), true));
661
662             objectList = MessageCollectorActor.getAllMatching(followerActor,
663                 InstallSnapshotMessages.InstallSnapshot.class);
664
665             assertEquals(2, objectList.size());
666
667             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
668
669             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
670                 "follower-reply", installSnapshot.getChunkIndex(), true));
671
672             objectList = MessageCollectorActor.getAllMatching(followerActor,
673                 InstallSnapshotMessages.InstallSnapshot.class);
674
675             assertEquals(3, objectList.size());
676
677             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
678
679             // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
680             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
681                 "follower-reply", installSnapshot.getChunkIndex(), true));
682
683             objectList = MessageCollectorActor.getAllMatching(followerActor,
684                 InstallSnapshotMessages.InstallSnapshot.class);
685
686             // Count should still stay at 3
687             assertEquals(3, objectList.size());
688         }};
689     }
690
691
692     @Test
693     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
694         new JavaTestKit(getSystem()) {{
695
696             TestActorRef<MessageCollectorActor> followerActor =
697                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
698
699             Map<String, String> peerAddresses = new HashMap<>();
700             peerAddresses.put(followerActor.path().toString(),
701                     followerActor.path().toString());
702
703             final int followersLastIndex = 2;
704             final int snapshotIndex = 3;
705             final int snapshotTerm = 1;
706             final int currentTerm = 2;
707
708             MockRaftActorContext actorContext =
709                     (MockRaftActorContext) createActorContext();
710
711             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
712                 @Override
713                 public int getSnapshotChunkSize() {
714                     return 50;
715                 }
716             });
717             actorContext.setPeerAddresses(peerAddresses);
718             actorContext.setCommitIndex(followersLastIndex);
719
720             MockLeader leader = new MockLeader(actorContext);
721
722             Map<String, String> leadersSnapshot = new HashMap<>();
723             leadersSnapshot.put("1", "A");
724             leadersSnapshot.put("2", "B");
725             leadersSnapshot.put("3", "C");
726
727             // set the snapshot variables in replicatedlog
728             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
729             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
730             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
731
732             ByteString bs = toByteString(leadersSnapshot);
733             leader.setSnapshot(Optional.of(bs));
734
735             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
736
737             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
738
739             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
740
741             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
742
743             assertEquals(1, installSnapshot.getChunkIndex());
744             assertEquals(3, installSnapshot.getTotalChunks());
745
746
747             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
748                 followerActor.path().toString(), -1, false));
749
750             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
751                 TimeUnit.MILLISECONDS);
752
753             leader.handleMessage(leaderActor, new SendHeartBeat());
754
755             o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
756
757             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
758
759             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
760
761             assertEquals(1, installSnapshot.getChunkIndex());
762             assertEquals(3, installSnapshot.getTotalChunks());
763
764             followerActor.tell(PoisonPill.getInstance(), getRef());
765         }};
766     }
767
768     @Test
769     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
770         new JavaTestKit(getSystem()) {
771             {
772
773                 TestActorRef<MessageCollectorActor> followerActor =
774                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
775
776                 Map<String, String> peerAddresses = new HashMap<>();
777                 peerAddresses.put(followerActor.path().toString(),
778                         followerActor.path().toString());
779
780                 final int followersLastIndex = 2;
781                 final int snapshotIndex = 3;
782                 final int snapshotTerm = 1;
783                 final int currentTerm = 2;
784
785                 MockRaftActorContext actorContext =
786                         (MockRaftActorContext) createActorContext();
787
788                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
789                     @Override
790                     public int getSnapshotChunkSize() {
791                         return 50;
792                     }
793                 });
794                 actorContext.setPeerAddresses(peerAddresses);
795                 actorContext.setCommitIndex(followersLastIndex);
796
797                 MockLeader leader = new MockLeader(actorContext);
798
799                 Map<String, String> leadersSnapshot = new HashMap<>();
800                 leadersSnapshot.put("1", "A");
801                 leadersSnapshot.put("2", "B");
802                 leadersSnapshot.put("3", "C");
803
804                 // set the snapshot variables in replicatedlog
805                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
806                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
807                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
808
809                 ByteString bs = toByteString(leadersSnapshot);
810                 leader.setSnapshot(Optional.of(bs));
811
812                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
813
814                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
815
816                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
817
818                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
819
820                 assertEquals(1, installSnapshot.getChunkIndex());
821                 assertEquals(3, installSnapshot.getTotalChunks());
822                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
823
824                 int hashCode = installSnapshot.getData().hashCode();
825
826                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
827
828                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
829
830                 leader.handleMessage(leaderActor, new SendHeartBeat());
831
832                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
833
834                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
835
836                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
837
838                 assertEquals(2, installSnapshot.getChunkIndex());
839                 assertEquals(3, installSnapshot.getTotalChunks());
840                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
841
842                 followerActor.tell(PoisonPill.getInstance(), getRef());
843             }};
844     }
845
846     @Test
847     public void testFollowerToSnapshotLogic() {
848
849         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
850
851         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
852             @Override
853             public int getSnapshotChunkSize() {
854                 return 50;
855             }
856         });
857
858         MockLeader leader = new MockLeader(actorContext);
859
860         Map<String, String> leadersSnapshot = new HashMap<>();
861         leadersSnapshot.put("1", "A");
862         leadersSnapshot.put("2", "B");
863         leadersSnapshot.put("3", "C");
864
865         ByteString bs = toByteString(leadersSnapshot);
866         byte[] barray = bs.toByteArray();
867
868         leader.createFollowerToSnapshot("followerId", bs);
869         assertEquals(bs.size(), barray.length);
870
871         int chunkIndex=0;
872         for (int i=0; i < barray.length; i = i + 50) {
873             int j = i + 50;
874             chunkIndex++;
875
876             if (i + 50 > barray.length) {
877                 j = barray.length;
878             }
879
880             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
881             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
882             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
883
884             leader.getFollowerToSnapshot().markSendStatus(true);
885             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
886                 leader.getFollowerToSnapshot().incrementChunkIndex();
887             }
888         }
889
890         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
891     }
892
893
894     @Override protected RaftActorBehavior createBehavior(
895         RaftActorContext actorContext) {
896         return new Leader(actorContext);
897     }
898
899     @Override protected RaftActorContext createActorContext() {
900         return createActorContext(leaderActor);
901     }
902
903     @Override
904     protected RaftActorContext createActorContext(ActorRef actorRef) {
905         return new MockRaftActorContext("test", getSystem(), actorRef);
906     }
907
908     private ByteString toByteString(Map<String, String> state) {
909         ByteArrayOutputStream b = null;
910         ObjectOutputStream o = null;
911         try {
912             try {
913                 b = new ByteArrayOutputStream();
914                 o = new ObjectOutputStream(b);
915                 o.writeObject(state);
916                 byte[] snapshotBytes = b.toByteArray();
917                 return ByteString.copyFrom(snapshotBytes);
918             } finally {
919                 if (o != null) {
920                     o.flush();
921                     o.close();
922                 }
923                 if (b != null) {
924                     b.close();
925                 }
926             }
927         } catch (IOException e) {
928             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
929         }
930         return null;
931     }
932
933     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
934         private static AbstractRaftActorBehavior behavior;
935
936         public ForwardMessageToBehaviorActor(){
937
938         }
939
940         @Override public void onReceive(Object message) throws Exception {
941             super.onReceive(message);
942             behavior.handleMessage(sender(), message);
943         }
944
945         public static void setBehavior(AbstractRaftActorBehavior behavior){
946             ForwardMessageToBehaviorActor.behavior = behavior;
947         }
948     }
949
950     @Test
951     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
952         new JavaTestKit(getSystem()) {{
953
954             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
955
956             MockRaftActorContext leaderActorContext =
957                 new MockRaftActorContext("leader", getSystem(), leaderActor);
958
959             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
960
961             MockRaftActorContext followerActorContext =
962                 new MockRaftActorContext("follower", getSystem(), followerActor);
963
964             Follower follower = new Follower(followerActorContext);
965
966             ForwardMessageToBehaviorActor.setBehavior(follower);
967
968             Map<String, String> peerAddresses = new HashMap<>();
969             peerAddresses.put(followerActor.path().toString(),
970                 followerActor.path().toString());
971
972             leaderActorContext.setPeerAddresses(peerAddresses);
973
974             leaderActorContext.getReplicatedLog().removeFrom(0);
975
976             //create 3 entries
977             leaderActorContext.setReplicatedLog(
978                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
979
980             leaderActorContext.setCommitIndex(1);
981
982             followerActorContext.getReplicatedLog().removeFrom(0);
983
984             // follower too has the exact same log entries and has the same commit index
985             followerActorContext.setReplicatedLog(
986                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
987
988             followerActorContext.setCommitIndex(1);
989
990             Leader leader = new Leader(leaderActorContext);
991             leader.markFollowerActive(followerActor.path().toString());
992
993             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
994                 TimeUnit.MILLISECONDS);
995
996             leader.handleMessage(leaderActor, new SendHeartBeat());
997
998             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
999                     .getFirstMatching(followerActor, AppendEntries.class);
1000
1001             assertNotNull(appendEntries);
1002
1003             assertEquals(1, appendEntries.getLeaderCommit());
1004             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1005             assertEquals(0, appendEntries.getPrevLogIndex());
1006
1007             AppendEntriesReply appendEntriesReply =
1008                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1009                     leaderActor, AppendEntriesReply.class);
1010
1011             assertNotNull(appendEntriesReply);
1012
1013             // follower returns its next index
1014             assertEquals(2, appendEntriesReply.getLogLastIndex());
1015             assertEquals(1, appendEntriesReply.getLogLastTerm());
1016
1017         }};
1018     }
1019
1020
1021     @Test
1022     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1023         new JavaTestKit(getSystem()) {{
1024
1025             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1026
1027             MockRaftActorContext leaderActorContext =
1028                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1029
1030             ActorRef followerActor = getSystem().actorOf(
1031                 Props.create(ForwardMessageToBehaviorActor.class));
1032
1033             MockRaftActorContext followerActorContext =
1034                 new MockRaftActorContext("follower", getSystem(), followerActor);
1035
1036             Follower follower = new Follower(followerActorContext);
1037
1038             ForwardMessageToBehaviorActor.setBehavior(follower);
1039
1040             Map<String, String> peerAddresses = new HashMap<>();
1041             peerAddresses.put(followerActor.path().toString(),
1042                 followerActor.path().toString());
1043
1044             leaderActorContext.setPeerAddresses(peerAddresses);
1045
1046             leaderActorContext.getReplicatedLog().removeFrom(0);
1047
1048             leaderActorContext.setReplicatedLog(
1049                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1050
1051             leaderActorContext.setCommitIndex(1);
1052
1053             followerActorContext.getReplicatedLog().removeFrom(0);
1054
1055             followerActorContext.setReplicatedLog(
1056                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1057
1058             // follower has the same log entries but its commit index > leaders commit index
1059             followerActorContext.setCommitIndex(2);
1060
1061             Leader leader = new Leader(leaderActorContext);
1062             leader.markFollowerActive(followerActor.path().toString());
1063
1064             Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
1065
1066             leader.handleMessage(leaderActor, new SendHeartBeat());
1067
1068             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
1069                     .getFirstMatching(followerActor, AppendEntries.class);
1070
1071             assertNotNull(appendEntries);
1072
1073             assertEquals(1, appendEntries.getLeaderCommit());
1074             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1075             assertEquals(0, appendEntries.getPrevLogIndex());
1076
1077             AppendEntriesReply appendEntriesReply =
1078                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1079                     leaderActor, AppendEntriesReply.class);
1080
1081             assertNotNull(appendEntriesReply);
1082
1083             assertEquals(2, appendEntriesReply.getLogLastIndex());
1084             assertEquals(1, appendEntriesReply.getLogLastTerm());
1085
1086         }};
1087     }
1088
1089     @Test
1090     public void testHandleAppendEntriesReplyFailure(){
1091         new JavaTestKit(getSystem()) {
1092             {
1093
1094                 ActorRef leaderActor =
1095                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1096
1097                 ActorRef followerActor =
1098                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1099
1100
1101                 MockRaftActorContext leaderActorContext =
1102                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1103
1104                 Map<String, String> peerAddresses = new HashMap<>();
1105                 peerAddresses.put("follower-1",
1106                     followerActor.path().toString());
1107
1108                 leaderActorContext.setPeerAddresses(peerAddresses);
1109
1110                 Leader leader = new Leader(leaderActorContext);
1111
1112                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1113
1114                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1115
1116                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1117
1118             }};
1119     }
1120
1121     @Test
1122     public void testHandleAppendEntriesReplySuccess() throws Exception {
1123         new JavaTestKit(getSystem()) {
1124             {
1125
1126                 ActorRef leaderActor =
1127                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1128
1129                 ActorRef followerActor =
1130                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1131
1132
1133                 MockRaftActorContext leaderActorContext =
1134                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1135
1136                 leaderActorContext.setReplicatedLog(
1137                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1138
1139                 Map<String, String> peerAddresses = new HashMap<>();
1140                 peerAddresses.put("follower-1",
1141                     followerActor.path().toString());
1142
1143                 leaderActorContext.setPeerAddresses(peerAddresses);
1144                 leaderActorContext.setCommitIndex(1);
1145                 leaderActorContext.setLastApplied(1);
1146                 leaderActorContext.getTermInformation().update(1, "leader");
1147
1148                 Leader leader = new Leader(leaderActorContext);
1149
1150                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1151
1152                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1153
1154                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1155
1156                 assertEquals(2, leaderActorContext.getCommitIndex());
1157
1158                 ApplyLogEntries applyLogEntries =
1159                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1160                         ApplyLogEntries.class);
1161
1162                 assertNotNull(applyLogEntries);
1163
1164                 assertEquals(2, leaderActorContext.getLastApplied());
1165
1166                 assertEquals(2, applyLogEntries.getToIndex());
1167
1168                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1169                     ApplyState.class);
1170
1171                 assertEquals(1,applyStateList.size());
1172
1173                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1174
1175                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1176
1177             }};
1178     }
1179
1180     @Test
1181     public void testHandleAppendEntriesReplyUnknownFollower(){
1182         new JavaTestKit(getSystem()) {
1183             {
1184
1185                 ActorRef leaderActor =
1186                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1187
1188                 MockRaftActorContext leaderActorContext =
1189                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1190
1191                 Leader leader = new Leader(leaderActorContext);
1192
1193                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1194
1195                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1196
1197                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1198
1199             }};
1200     }
1201
1202     @Test
1203     public void testHandleRequestVoteReply(){
1204         new JavaTestKit(getSystem()) {
1205             {
1206
1207                 ActorRef leaderActor =
1208                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1209
1210                 MockRaftActorContext leaderActorContext =
1211                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1212
1213                 Leader leader = new Leader(leaderActorContext);
1214
1215                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1216
1217                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1218
1219                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1220
1221                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1222             }};
1223     }
1224
1225     @Test
1226     public void testIsolatedLeaderCheckNoFollowers() {
1227         new JavaTestKit(getSystem()) {{
1228             ActorRef leaderActor = getTestActor();
1229
1230             MockRaftActorContext leaderActorContext =
1231                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1232
1233             Map<String, String> peerAddresses = new HashMap<>();
1234             leaderActorContext.setPeerAddresses(peerAddresses);
1235
1236             Leader leader = new Leader(leaderActorContext);
1237             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1238             Assert.assertTrue(behavior instanceof Leader);
1239         }};
1240     }
1241
1242     @Test
1243     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1244         new JavaTestKit(getSystem()) {{
1245
1246             ActorRef followerActor1 = getTestActor();
1247             ActorRef followerActor2 = getTestActor();
1248
1249             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1250
1251             Map<String, String> peerAddresses = new HashMap<>();
1252             peerAddresses.put("follower-1", followerActor1.path().toString());
1253             peerAddresses.put("follower-2", followerActor2.path().toString());
1254
1255             leaderActorContext.setPeerAddresses(peerAddresses);
1256
1257             Leader leader = new Leader(leaderActorContext);
1258             leader.stopIsolatedLeaderCheckSchedule();
1259
1260             leader.markFollowerActive("follower-1");
1261             leader.markFollowerActive("follower-2");
1262             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1263             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1264                 behavior instanceof Leader);
1265
1266             // kill 1 follower and verify if that got killed
1267             final JavaTestKit probe = new JavaTestKit(getSystem());
1268             probe.watch(followerActor1);
1269             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1270             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1271             assertEquals(termMsg1.getActor(), followerActor1);
1272
1273             leader.markFollowerInActive("follower-1");
1274             leader.markFollowerActive("follower-2");
1275             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1276             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1277                 behavior instanceof Leader);
1278
1279             // kill 2nd follower and leader should change to Isolated leader
1280             followerActor2.tell(PoisonPill.getInstance(), null);
1281             probe.watch(followerActor2);
1282             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1283             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1284             assertEquals(termMsg2.getActor(), followerActor2);
1285
1286             leader.markFollowerInActive("follower-2");
1287             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1288             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1289                 behavior instanceof IsolatedLeader);
1290
1291         }};
1292     }
1293
1294
1295     @Test
1296     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1297         new JavaTestKit(getSystem()) {{
1298
1299             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1300
1301             MockRaftActorContext leaderActorContext =
1302                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1303
1304             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1305             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1306             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1307
1308             leaderActorContext.setConfigParams(configParams);
1309
1310             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1311
1312             MockRaftActorContext followerActorContext =
1313                 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1314
1315             followerActorContext.setConfigParams(configParams);
1316
1317             Follower follower = new Follower(followerActorContext);
1318
1319             ForwardMessageToBehaviorActor.setBehavior(follower);
1320
1321             Map<String, String> peerAddresses = new HashMap<>();
1322             peerAddresses.put("follower-reply",
1323                 followerActor.path().toString());
1324
1325             leaderActorContext.setPeerAddresses(peerAddresses);
1326
1327             leaderActorContext.getReplicatedLog().removeFrom(0);
1328
1329             //create 3 entries
1330             leaderActorContext.setReplicatedLog(
1331                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1332
1333             leaderActorContext.setCommitIndex(1);
1334
1335             Leader leader = new Leader(leaderActorContext);
1336             leader.markFollowerActive("follower-reply");
1337
1338             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1339                 TimeUnit.MILLISECONDS);
1340
1341             leader.handleMessage(leaderActor, new SendHeartBeat());
1342
1343             AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1344                 .getFirstMatching(followerActor, AppendEntries.class);
1345
1346             assertNotNull(appendEntries);
1347
1348             assertEquals(1, appendEntries.getLeaderCommit());
1349             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1350             assertEquals(0, appendEntries.getPrevLogIndex());
1351
1352             AppendEntriesReply appendEntriesReply =
1353                 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1354
1355             assertNotNull(appendEntriesReply);
1356
1357             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1358
1359             List<Object> entries = ForwardMessageToBehaviorActor
1360                 .getAllMatching(followerActor, AppendEntries.class);
1361
1362             assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1363
1364             AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1365
1366             assertEquals(1, appendEntriesSecond.getLeaderCommit());
1367             assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1368             assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1369
1370         }};
1371     }
1372
1373     class MockLeader extends Leader {
1374
1375         FollowerToSnapshot fts;
1376
1377         public MockLeader(RaftActorContext context){
1378             super(context);
1379         }
1380
1381         public FollowerToSnapshot getFollowerToSnapshot() {
1382             return fts;
1383         }
1384
1385         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1386             fts = new FollowerToSnapshot(bs);
1387             setFollowerSnapshot(followerId, fts);
1388         }
1389     }
1390
1391     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1392
1393         private final long electionTimeOutIntervalMillis;
1394         private final int snapshotChunkSize;
1395
1396         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1397             super();
1398             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1399             this.snapshotChunkSize = snapshotChunkSize;
1400         }
1401
1402         @Override
1403         public FiniteDuration getElectionTimeOutInterval() {
1404             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1405         }
1406
1407         @Override
1408         public int getSnapshotChunkSize() {
1409             return snapshotChunkSize;
1410         }
1411     }
1412 }