Merge "(Bug 2035) - Increasing default Akka config for auto-down of unreachable nodes...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import org.slf4j.impl.SimpleLogger;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     static {
53         // This enables trace logging for the tests.
54         System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
55     }
56
57     private final ActorRef leaderActor =
58         getSystem().actorOf(Props.create(DoNothingActor.class));
59     private final ActorRef senderActor =
60         getSystem().actorOf(Props.create(DoNothingActor.class));
61
62     @Test
63     public void testHandleMessageForUnknownMessage() throws Exception {
64         new JavaTestKit(getSystem()) {{
65             Leader leader =
66                 new Leader(createActorContext());
67
68             // handle message should return the Leader state when it receives an
69             // unknown message
70             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
71             Assert.assertTrue(behavior instanceof Leader);
72         }};
73     }
74
75     @Test
76     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
77         new JavaTestKit(getSystem()) {{
78             new Within(duration("1 seconds")) {
79                 @Override
80                 protected void run() {
81                     ActorRef followerActor = getTestActor();
82
83                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
84
85                     Map<String, String> peerAddresses = new HashMap<>();
86
87                     String followerId = "follower";
88                     peerAddresses.put(followerId, followerActor.path().toString());
89
90                     actorContext.setPeerAddresses(peerAddresses);
91
92                     long term = 1;
93                     actorContext.getTermInformation().update(term, "");
94
95                     Leader leader = new Leader(actorContext);
96
97                     // Leader should send an immediate heartbeat with no entries as follower is inactive.
98                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
99                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
100                     assertEquals("getTerm", term, appendEntries.getTerm());
101                     assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102                     assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103                     assertEquals("Entries size", 0, appendEntries.getEntries().size());
104
105                     // The follower would normally reply - simulate that explicitly here.
106                     leader.handleMessage(followerActor, new AppendEntriesReply(
107                             followerId, term, true, lastIndex - 1, term));
108                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
109
110                     // Sleep for the heartbeat interval so AppendEntries is sent.
111                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
112                             getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
113
114                     leader.handleMessage(senderActor, new SendHeartBeat());
115
116                     appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
117                     assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
118                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
119                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
120                     assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
121                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
122                 }
123             };
124         }};
125     }
126
127     @Test
128     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
129         new JavaTestKit(getSystem()) {{
130             new Within(duration("1 seconds")) {
131                 @Override
132                 protected void run() {
133                     ActorRef followerActor = getTestActor();
134
135                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
136
137                     Map<String, String> peerAddresses = new HashMap<>();
138
139                     String followerId = "follower";
140                     peerAddresses.put(followerId, followerActor.path().toString());
141
142                     actorContext.setPeerAddresses(peerAddresses);
143
144                     long term = 1;
145                     actorContext.getTermInformation().update(term, "");
146
147                     Leader leader = new Leader(actorContext);
148
149                     // Leader will send an immediate heartbeat - ignore it.
150                     expectMsgClass(duration("5 seconds"), AppendEntries.class);
151
152                     // The follower would normally reply - simulate that explicitly here.
153                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
154                     leader.handleMessage(followerActor, new AppendEntriesReply(
155                             followerId, term, true, lastIndex, term));
156                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
157
158                     MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159                     MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
160                             1, lastIndex + 1, payload);
161                     actorContext.getReplicatedLog().append(newEntry);
162                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
163                             new Replicate(null, null, newEntry));
164
165                     // State should not change
166                     assertTrue(raftBehavior instanceof Leader);
167
168                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
169                     assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
170                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
171                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
172                     assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
173                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
174                     assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
175                 }
176             };
177         }};
178     }
179
180     @Test
181     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
182         new JavaTestKit(getSystem()) {{
183             new Within(duration("1 seconds")) {
184                 @Override
185                 protected void run() {
186
187                     ActorRef raftActor = getTestActor();
188
189                     MockRaftActorContext actorContext =
190                         new MockRaftActorContext("test", getSystem(), raftActor);
191
192                     actorContext.getReplicatedLog().removeFrom(0);
193
194                     actorContext.setReplicatedLog(
195                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
196                             .build());
197
198                     Leader leader = new Leader(actorContext);
199                     RaftActorBehavior raftBehavior = leader
200                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
201
202                     // State should not change
203                     assertTrue(raftBehavior instanceof Leader);
204
205                     assertEquals(1, actorContext.getCommitIndex());
206
207                     final String out =
208                         new ExpectMsg<String>(duration("1 seconds"),
209                             "match hint") {
210                             // do not put code outside this method, will run afterwards
211                             @Override
212                             protected String match(Object in) {
213                                 if (in instanceof ApplyState) {
214                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
215                                         return "match";
216                                     }
217                                     return null;
218                                 } else {
219                                     throw noMatch();
220                                 }
221                             }
222                         }.get(); // this extracts the received message
223
224                     assertEquals("match", out);
225
226                 }
227             };
228         }};
229     }
230
231     @Test
232     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
233         new JavaTestKit(getSystem()) {{
234             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
235
236             Map<String, String> peerAddresses = new HashMap<>();
237             peerAddresses.put(followerActor.path().toString(),
238                 followerActor.path().toString());
239
240             MockRaftActorContext actorContext =
241                 (MockRaftActorContext) createActorContext(leaderActor);
242             actorContext.setPeerAddresses(peerAddresses);
243
244             Map<String, String> leadersSnapshot = new HashMap<>();
245             leadersSnapshot.put("1", "A");
246             leadersSnapshot.put("2", "B");
247             leadersSnapshot.put("3", "C");
248
249             //clears leaders log
250             actorContext.getReplicatedLog().removeFrom(0);
251
252             final int followersLastIndex = 2;
253             final int snapshotIndex = 3;
254             final int newEntryIndex = 4;
255             final int snapshotTerm = 1;
256             final int currentTerm = 2;
257
258             // set the snapshot variables in replicatedlog
259             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
260             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
261             actorContext.setCommitIndex(followersLastIndex);
262             //set follower timeout to 2 mins, helps during debugging
263             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
264
265             MockLeader leader = new MockLeader(actorContext);
266
267             // new entry
268             ReplicatedLogImplEntry entry =
269                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
270                     new MockRaftActorContext.MockPayload("D"));
271
272             //update follower timestamp
273             leader.markFollowerActive(followerActor.path().toString());
274
275             ByteString bs = toByteString(leadersSnapshot);
276             leader.setSnapshot(Optional.of(bs));
277             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
278
279             //send first chunk and no InstallSnapshotReply received yet
280             leader.getFollowerToSnapshot().getNextChunk();
281             leader.getFollowerToSnapshot().incrementChunkIndex();
282
283             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
284                 TimeUnit.MILLISECONDS);
285
286             leader.handleMessage(leaderActor, new SendHeartBeat());
287
288             AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
289                 followerActor, AppendEntries.class);
290
291             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
292                 "received", aeproto);
293
294             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
295
296             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
297
298             //InstallSnapshotReply received
299             leader.getFollowerToSnapshot().markSendStatus(true);
300
301             leader.handleMessage(senderActor, new SendHeartBeat());
302
303             InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
304                 InstallSnapshot.SERIALIZABLE_CLASS);
305
306             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
307                 isproto);
308
309             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
310
311             assertEquals(snapshotIndex, is.getLastIncludedIndex());
312
313         }};
314     }
315
316     @Test
317     public void testSendAppendEntriesSnapshotScenario() {
318         new JavaTestKit(getSystem()) {{
319
320             ActorRef followerActor = getTestActor();
321
322             Map<String, String> peerAddresses = new HashMap<>();
323             peerAddresses.put(followerActor.path().toString(),
324                 followerActor.path().toString());
325
326             MockRaftActorContext actorContext =
327                 (MockRaftActorContext) createActorContext(getRef());
328             actorContext.setPeerAddresses(peerAddresses);
329
330             Map<String, String> leadersSnapshot = new HashMap<>();
331             leadersSnapshot.put("1", "A");
332             leadersSnapshot.put("2", "B");
333             leadersSnapshot.put("3", "C");
334
335             //clears leaders log
336             actorContext.getReplicatedLog().removeFrom(0);
337
338             final int followersLastIndex = 2;
339             final int snapshotIndex = 3;
340             final int newEntryIndex = 4;
341             final int snapshotTerm = 1;
342             final int currentTerm = 2;
343
344             // set the snapshot variables in replicatedlog
345             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
346             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
347             actorContext.setCommitIndex(followersLastIndex);
348
349             Leader leader = new Leader(actorContext);
350
351             // new entry
352             ReplicatedLogImplEntry entry =
353                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
354                     new MockRaftActorContext.MockPayload("D"));
355
356             //update follower timestamp
357             leader.markFollowerActive(followerActor.path().toString());
358
359             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
360                 TimeUnit.MILLISECONDS);
361
362             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
363             RaftActorBehavior raftBehavior = leader.handleMessage(
364                 senderActor, new Replicate(null, "state-id", entry));
365
366             assertTrue(raftBehavior instanceof Leader);
367
368             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
369             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
370                 @Override
371                 protected Boolean match(Object o) throws Exception {
372                     if (o instanceof InitiateInstallSnapshot) {
373                         return true;
374                     }
375                     return false;
376                 }
377             }.get();
378
379             boolean initiateInitiateInstallSnapshot = false;
380             for (Boolean b: matches) {
381                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
382             }
383
384             assertTrue(initiateInitiateInstallSnapshot);
385         }};
386     }
387
388     @Test
389     public void testInitiateInstallSnapshot() throws Exception {
390         new JavaTestKit(getSystem()) {{
391
392             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
393
394             ActorRef followerActor = getTestActor();
395
396             Map<String, String> peerAddresses = new HashMap<>();
397             peerAddresses.put(followerActor.path().toString(),
398                 followerActor.path().toString());
399
400
401             MockRaftActorContext actorContext =
402                 (MockRaftActorContext) createActorContext(leaderActor);
403             actorContext.setPeerAddresses(peerAddresses);
404
405             Map<String, String> leadersSnapshot = new HashMap<>();
406             leadersSnapshot.put("1", "A");
407             leadersSnapshot.put("2", "B");
408             leadersSnapshot.put("3", "C");
409
410             //clears leaders log
411             actorContext.getReplicatedLog().removeFrom(0);
412
413             final int followersLastIndex = 2;
414             final int snapshotIndex = 3;
415             final int newEntryIndex = 4;
416             final int snapshotTerm = 1;
417             final int currentTerm = 2;
418
419             // set the snapshot variables in replicatedlog
420             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
421             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
422             actorContext.setLastApplied(3);
423             actorContext.setCommitIndex(followersLastIndex);
424
425             Leader leader = new Leader(actorContext);
426             // set the snapshot as absent and check if capture-snapshot is invoked.
427             leader.setSnapshot(Optional.<ByteString>absent());
428
429             // new entry
430             ReplicatedLogImplEntry entry =
431                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
432                     new MockRaftActorContext.MockPayload("D"));
433
434             actorContext.getReplicatedLog().append(entry);
435
436             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
437             RaftActorBehavior raftBehavior = leader.handleMessage(
438                 leaderActor, new InitiateInstallSnapshot());
439
440             CaptureSnapshot cs = MessageCollectorActor.
441                 getFirstMatching(leaderActor, CaptureSnapshot.class);
442
443             assertNotNull(cs);
444
445             assertTrue(cs.isInstallSnapshotInitiated());
446             assertEquals(3, cs.getLastAppliedIndex());
447             assertEquals(1, cs.getLastAppliedTerm());
448             assertEquals(4, cs.getLastIndex());
449             assertEquals(2, cs.getLastTerm());
450
451             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
452             raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
453             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
454             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
455
456         }};
457     }
458
459     @Test
460     public void testInstallSnapshot() {
461         new JavaTestKit(getSystem()) {{
462
463             ActorRef followerActor = getTestActor();
464
465             Map<String, String> peerAddresses = new HashMap<>();
466             peerAddresses.put(followerActor.path().toString(),
467                 followerActor.path().toString());
468
469             MockRaftActorContext actorContext =
470                 (MockRaftActorContext) createActorContext();
471             actorContext.setPeerAddresses(peerAddresses);
472
473
474             Map<String, String> leadersSnapshot = new HashMap<>();
475             leadersSnapshot.put("1", "A");
476             leadersSnapshot.put("2", "B");
477             leadersSnapshot.put("3", "C");
478
479             //clears leaders log
480             actorContext.getReplicatedLog().removeFrom(0);
481
482             final int followersLastIndex = 2;
483             final int snapshotIndex = 3;
484             final int newEntryIndex = 4;
485             final int snapshotTerm = 1;
486             final int currentTerm = 2;
487
488             // set the snapshot variables in replicatedlog
489             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
490             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
491             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
492             actorContext.setCommitIndex(followersLastIndex);
493
494             Leader leader = new Leader(actorContext);
495
496             // Ignore initial heartbeat.
497             expectMsgClass(duration("5 seconds"), AppendEntries.class);
498
499             // new entry
500             ReplicatedLogImplEntry entry =
501                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
502                     new MockRaftActorContext.MockPayload("D"));
503
504             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
505                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
506
507             assertTrue(raftBehavior instanceof Leader);
508
509             // check if installsnapshot gets called with the correct values.
510             final String out =
511                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
512                     // do not put code outside this method, will run afterwards
513                     @Override
514                     protected String match(Object in) {
515                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
516                             InstallSnapshot is = (InstallSnapshot)
517                                 SerializationUtils.fromSerializable(in);
518                             if (is.getData() == null) {
519                                 return "InstallSnapshot data is null";
520                             }
521                             if (is.getLastIncludedIndex() != snapshotIndex) {
522                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
523                             }
524                             if (is.getLastIncludedTerm() != snapshotTerm) {
525                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
526                             }
527                             if (is.getTerm() == currentTerm) {
528                                 return is.getTerm() + "!=" + currentTerm;
529                             }
530
531                             return "match";
532
533                         } else {
534                             return "message mismatch:" + in.getClass();
535                         }
536                     }
537                 }.get(); // this extracts the received message
538
539             assertEquals("match", out);
540         }};
541     }
542
543     @Test
544     public void testHandleInstallSnapshotReplyLastChunk() {
545         new JavaTestKit(getSystem()) {{
546
547             ActorRef followerActor = getTestActor();
548
549             Map<String, String> peerAddresses = new HashMap<>();
550             peerAddresses.put(followerActor.path().toString(),
551                 followerActor.path().toString());
552
553             final int followersLastIndex = 2;
554             final int snapshotIndex = 3;
555             final int newEntryIndex = 4;
556             final int snapshotTerm = 1;
557             final int currentTerm = 2;
558
559             MockRaftActorContext actorContext =
560                 (MockRaftActorContext) createActorContext();
561             actorContext.setPeerAddresses(peerAddresses);
562             actorContext.setCommitIndex(followersLastIndex);
563
564             MockLeader leader = new MockLeader(actorContext);
565
566             // Ignore initial heartbeat.
567             expectMsgClass(duration("5 seconds"), AppendEntries.class);
568
569             Map<String, String> leadersSnapshot = new HashMap<>();
570             leadersSnapshot.put("1", "A");
571             leadersSnapshot.put("2", "B");
572             leadersSnapshot.put("3", "C");
573
574             // set the snapshot variables in replicatedlog
575
576             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
577             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
578             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
579
580             ByteString bs = toByteString(leadersSnapshot);
581             leader.setSnapshot(Optional.of(bs));
582             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
583             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
584                 leader.getFollowerToSnapshot().getNextChunk();
585                 leader.getFollowerToSnapshot().incrementChunkIndex();
586             }
587
588             //clears leaders log
589             actorContext.getReplicatedLog().removeFrom(0);
590
591             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
592                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
593                     leader.getFollowerToSnapshot().getChunkIndex(), true));
594
595             assertTrue(raftBehavior instanceof Leader);
596
597             assertEquals(0, leader.followerSnapshotSize());
598             assertEquals(1, leader.followerLogSize());
599             assertNotNull(leader.getFollower(followerActor.path().toString()));
600             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
601             assertEquals(snapshotIndex, fli.getMatchIndex());
602             assertEquals(snapshotIndex, fli.getMatchIndex());
603             assertEquals(snapshotIndex + 1, fli.getNextIndex());
604         }};
605     }
606     @Test
607     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
608         new JavaTestKit(getSystem()) {{
609
610             TestActorRef<MessageCollectorActor> followerActor =
611                 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
612
613             Map<String, String> peerAddresses = new HashMap<>();
614             peerAddresses.put("follower-reply",
615                 followerActor.path().toString());
616
617             final int followersLastIndex = 2;
618             final int snapshotIndex = 3;
619             final int snapshotTerm = 1;
620             final int currentTerm = 2;
621
622             MockRaftActorContext actorContext =
623                 (MockRaftActorContext) createActorContext();
624             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
625                 @Override
626                 public int getSnapshotChunkSize() {
627                     return 50;
628                 }
629             };
630             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
631             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
632
633             actorContext.setConfigParams(configParams);
634             actorContext.setPeerAddresses(peerAddresses);
635             actorContext.setCommitIndex(followersLastIndex);
636
637             MockLeader leader = new MockLeader(actorContext);
638
639             Map<String, String> leadersSnapshot = new HashMap<>();
640             leadersSnapshot.put("1", "A");
641             leadersSnapshot.put("2", "B");
642             leadersSnapshot.put("3", "C");
643
644             // set the snapshot variables in replicatedlog
645             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
646             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
647             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
648
649             ByteString bs = toByteString(leadersSnapshot);
650             leader.setSnapshot(Optional.of(bs));
651
652             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
653
654             List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
655                 InstallSnapshotMessages.InstallSnapshot.class);
656
657             assertEquals(1, objectList.size());
658
659             Object o = objectList.get(0);
660             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
661
662             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
663
664             assertEquals(1, installSnapshot.getChunkIndex());
665             assertEquals(3, installSnapshot.getTotalChunks());
666
667             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
668                 "follower-reply", installSnapshot.getChunkIndex(), true));
669
670             objectList = MessageCollectorActor.getAllMatching(followerActor,
671                 InstallSnapshotMessages.InstallSnapshot.class);
672
673             assertEquals(2, objectList.size());
674
675             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
676
677             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
678                 "follower-reply", installSnapshot.getChunkIndex(), true));
679
680             objectList = MessageCollectorActor.getAllMatching(followerActor,
681                 InstallSnapshotMessages.InstallSnapshot.class);
682
683             assertEquals(3, objectList.size());
684
685             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
686
687             // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
688             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
689                 "follower-reply", installSnapshot.getChunkIndex(), true));
690
691             objectList = MessageCollectorActor.getAllMatching(followerActor,
692                 InstallSnapshotMessages.InstallSnapshot.class);
693
694             // Count should still stay at 3
695             assertEquals(3, objectList.size());
696         }};
697     }
698
699
700     @Test
701     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
702         new JavaTestKit(getSystem()) {{
703
704             TestActorRef<MessageCollectorActor> followerActor =
705                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
706
707             Map<String, String> peerAddresses = new HashMap<>();
708             peerAddresses.put(followerActor.path().toString(),
709                     followerActor.path().toString());
710
711             final int followersLastIndex = 2;
712             final int snapshotIndex = 3;
713             final int snapshotTerm = 1;
714             final int currentTerm = 2;
715
716             MockRaftActorContext actorContext =
717                     (MockRaftActorContext) createActorContext();
718
719             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
720                 @Override
721                 public int getSnapshotChunkSize() {
722                     return 50;
723                 }
724             });
725             actorContext.setPeerAddresses(peerAddresses);
726             actorContext.setCommitIndex(followersLastIndex);
727
728             MockLeader leader = new MockLeader(actorContext);
729
730             Map<String, String> leadersSnapshot = new HashMap<>();
731             leadersSnapshot.put("1", "A");
732             leadersSnapshot.put("2", "B");
733             leadersSnapshot.put("3", "C");
734
735             // set the snapshot variables in replicatedlog
736             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
739
740             ByteString bs = toByteString(leadersSnapshot);
741             leader.setSnapshot(Optional.of(bs));
742
743             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
744
745             MessageCollectorActor.getAllMatching(followerActor,
746                     InstallSnapshotMessages.InstallSnapshot.class);
747
748             InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
749                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
750             assertNotNull(installSnapshot);
751
752             assertEquals(1, installSnapshot.getChunkIndex());
753             assertEquals(3, installSnapshot.getTotalChunks());
754
755             followerActor.underlyingActor().clear();
756
757             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
758                 followerActor.path().toString(), -1, false));
759
760             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
761                 TimeUnit.MILLISECONDS);
762
763             leader.handleMessage(leaderActor, new SendHeartBeat());
764
765             installSnapshot = MessageCollectorActor.getFirstMatching(
766                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
767             assertNotNull(installSnapshot);
768
769             assertEquals(1, installSnapshot.getChunkIndex());
770             assertEquals(3, installSnapshot.getTotalChunks());
771
772             followerActor.tell(PoisonPill.getInstance(), getRef());
773         }};
774     }
775
776     @Test
777     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
778         new JavaTestKit(getSystem()) {
779             {
780                 TestActorRef<MessageCollectorActor> followerActor =
781                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
782
783                 Map<String, String> peerAddresses = new HashMap<>();
784                 peerAddresses.put(followerActor.path().toString(),
785                         followerActor.path().toString());
786
787                 final int followersLastIndex = 2;
788                 final int snapshotIndex = 3;
789                 final int snapshotTerm = 1;
790                 final int currentTerm = 2;
791
792                 MockRaftActorContext actorContext =
793                         (MockRaftActorContext) createActorContext();
794
795                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
796                     @Override
797                     public int getSnapshotChunkSize() {
798                         return 50;
799                     }
800                 });
801                 actorContext.setPeerAddresses(peerAddresses);
802                 actorContext.setCommitIndex(followersLastIndex);
803
804                 MockLeader leader = new MockLeader(actorContext);
805
806                 Map<String, String> leadersSnapshot = new HashMap<>();
807                 leadersSnapshot.put("1", "A");
808                 leadersSnapshot.put("2", "B");
809                 leadersSnapshot.put("3", "C");
810
811                 // set the snapshot variables in replicatedlog
812                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
813                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
814                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
815
816                 ByteString bs = toByteString(leadersSnapshot);
817                 leader.setSnapshot(Optional.of(bs));
818
819                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
820
821                 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
822                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
823                 assertNotNull(installSnapshot);
824
825                 assertEquals(1, installSnapshot.getChunkIndex());
826                 assertEquals(3, installSnapshot.getTotalChunks());
827                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
828
829                 int hashCode = installSnapshot.getData().hashCode();
830
831                 followerActor.underlyingActor().clear();
832
833                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
834
835                 installSnapshot = MessageCollectorActor.getFirstMatching(
836                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
837                 assertNotNull(installSnapshot);
838
839                 assertEquals(2, installSnapshot.getChunkIndex());
840                 assertEquals(3, installSnapshot.getTotalChunks());
841                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
842
843                 followerActor.tell(PoisonPill.getInstance(), getRef());
844             }};
845     }
846
847     @Test
848     public void testFollowerToSnapshotLogic() {
849
850         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
851
852         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
853             @Override
854             public int getSnapshotChunkSize() {
855                 return 50;
856             }
857         });
858
859         MockLeader leader = new MockLeader(actorContext);
860
861         Map<String, String> leadersSnapshot = new HashMap<>();
862         leadersSnapshot.put("1", "A");
863         leadersSnapshot.put("2", "B");
864         leadersSnapshot.put("3", "C");
865
866         ByteString bs = toByteString(leadersSnapshot);
867         byte[] barray = bs.toByteArray();
868
869         leader.createFollowerToSnapshot("followerId", bs);
870         assertEquals(bs.size(), barray.length);
871
872         int chunkIndex=0;
873         for (int i=0; i < barray.length; i = i + 50) {
874             int j = i + 50;
875             chunkIndex++;
876
877             if (i + 50 > barray.length) {
878                 j = barray.length;
879             }
880
881             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
882             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
883             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
884
885             leader.getFollowerToSnapshot().markSendStatus(true);
886             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
887                 leader.getFollowerToSnapshot().incrementChunkIndex();
888             }
889         }
890
891         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
892     }
893
894
895     @Override protected RaftActorBehavior createBehavior(
896         RaftActorContext actorContext) {
897         return new Leader(actorContext);
898     }
899
900     @Override protected RaftActorContext createActorContext() {
901         return createActorContext(leaderActor);
902     }
903
904     @Override
905     protected RaftActorContext createActorContext(ActorRef actorRef) {
906         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
907         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
908         configParams.setElectionTimeoutFactor(100000);
909         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
910         context.setConfigParams(configParams);
911         return context;
912     }
913
914     private ByteString toByteString(Map<String, String> state) {
915         ByteArrayOutputStream b = null;
916         ObjectOutputStream o = null;
917         try {
918             try {
919                 b = new ByteArrayOutputStream();
920                 o = new ObjectOutputStream(b);
921                 o.writeObject(state);
922                 byte[] snapshotBytes = b.toByteArray();
923                 return ByteString.copyFrom(snapshotBytes);
924             } finally {
925                 if (o != null) {
926                     o.flush();
927                     o.close();
928                 }
929                 if (b != null) {
930                     b.close();
931                 }
932             }
933         } catch (IOException e) {
934             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
935         }
936         return null;
937     }
938
939     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
940         AbstractRaftActorBehavior behavior;
941
942         @Override public void onReceive(Object message) throws Exception {
943             if(behavior != null) {
944                 behavior.handleMessage(sender(), message);
945             }
946
947             super.onReceive(message);
948         }
949
950         public static Props props() {
951             return Props.create(ForwardMessageToBehaviorActor.class);
952         }
953     }
954
955     @Test
956     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
957         new JavaTestKit(getSystem()) {{
958             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
959                     Props.create(ForwardMessageToBehaviorActor.class));
960
961             MockRaftActorContext leaderActorContext =
962                     new MockRaftActorContext("leader", getSystem(), leaderActor);
963
964             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
965                     ForwardMessageToBehaviorActor.props());
966
967             MockRaftActorContext followerActorContext =
968                     new MockRaftActorContext("follower", getSystem(), followerActor);
969
970             Follower follower = new Follower(followerActorContext);
971             followerActor.underlyingActor().behavior = follower;
972
973             Map<String, String> peerAddresses = new HashMap<>();
974             peerAddresses.put("follower", followerActor.path().toString());
975
976             leaderActorContext.setPeerAddresses(peerAddresses);
977
978             leaderActorContext.getReplicatedLog().removeFrom(0);
979
980             //create 3 entries
981             leaderActorContext.setReplicatedLog(
982                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
983
984             leaderActorContext.setCommitIndex(1);
985
986             followerActorContext.getReplicatedLog().removeFrom(0);
987
988             // follower too has the exact same log entries and has the same commit index
989             followerActorContext.setReplicatedLog(
990                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
991
992             followerActorContext.setCommitIndex(1);
993
994             Leader leader = new Leader(leaderActorContext);
995
996             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
997             assertNotNull(appendEntries);
998
999             assertEquals(1, appendEntries.getLeaderCommit());
1000             assertEquals(0, appendEntries.getEntries().size());
1001             assertEquals(0, appendEntries.getPrevLogIndex());
1002
1003             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1004                     leaderActor, AppendEntriesReply.class);
1005             assertNotNull(appendEntriesReply);
1006
1007             assertEquals(2, appendEntriesReply.getLogLastIndex());
1008             assertEquals(1, appendEntriesReply.getLogLastTerm());
1009
1010             // follower returns its next index
1011             assertEquals(2, appendEntriesReply.getLogLastIndex());
1012             assertEquals(1, appendEntriesReply.getLogLastTerm());
1013         }};
1014     }
1015
1016
1017     @Test
1018     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1019         new JavaTestKit(getSystem()) {{
1020             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1021                     Props.create(ForwardMessageToBehaviorActor.class));
1022
1023             MockRaftActorContext leaderActorContext =
1024                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1025
1026             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1027                     ForwardMessageToBehaviorActor.props());
1028
1029             MockRaftActorContext followerActorContext =
1030                     new MockRaftActorContext("follower", getSystem(), followerActor);
1031
1032             Follower follower = new Follower(followerActorContext);
1033             followerActor.underlyingActor().behavior = follower;
1034
1035             Map<String, String> peerAddresses = new HashMap<>();
1036             peerAddresses.put("follower", followerActor.path().toString());
1037
1038             leaderActorContext.setPeerAddresses(peerAddresses);
1039
1040             leaderActorContext.getReplicatedLog().removeFrom(0);
1041
1042             leaderActorContext.setReplicatedLog(
1043                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1044
1045             leaderActorContext.setCommitIndex(1);
1046
1047             followerActorContext.getReplicatedLog().removeFrom(0);
1048
1049             followerActorContext.setReplicatedLog(
1050                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1051
1052             // follower has the same log entries but its commit index > leaders commit index
1053             followerActorContext.setCommitIndex(2);
1054
1055             Leader leader = new Leader(leaderActorContext);
1056
1057             // Initial heartbeat
1058             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1059             assertNotNull(appendEntries);
1060
1061             assertEquals(1, appendEntries.getLeaderCommit());
1062             assertEquals(0, appendEntries.getEntries().size());
1063             assertEquals(0, appendEntries.getPrevLogIndex());
1064
1065             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1066                     leaderActor, AppendEntriesReply.class);
1067             assertNotNull(appendEntriesReply);
1068
1069             assertEquals(2, appendEntriesReply.getLogLastIndex());
1070             assertEquals(1, appendEntriesReply.getLogLastTerm());
1071
1072             leaderActor.underlyingActor().behavior = leader;
1073             leader.handleMessage(followerActor, appendEntriesReply);
1074
1075             leaderActor.underlyingActor().clear();
1076             followerActor.underlyingActor().clear();
1077
1078             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1079                     TimeUnit.MILLISECONDS);
1080
1081             leader.handleMessage(leaderActor, new SendHeartBeat());
1082
1083             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1084             assertNotNull(appendEntries);
1085
1086             assertEquals(1, appendEntries.getLeaderCommit());
1087             assertEquals(0, appendEntries.getEntries().size());
1088             assertEquals(2, appendEntries.getPrevLogIndex());
1089
1090             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1091             assertNotNull(appendEntriesReply);
1092
1093             assertEquals(2, appendEntriesReply.getLogLastIndex());
1094             assertEquals(1, appendEntriesReply.getLogLastTerm());
1095
1096             assertEquals(1, followerActorContext.getCommitIndex());
1097         }};
1098     }
1099
1100     @Test
1101     public void testHandleAppendEntriesReplyFailure(){
1102         new JavaTestKit(getSystem()) {
1103             {
1104
1105                 ActorRef leaderActor =
1106                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1107
1108                 ActorRef followerActor =
1109                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1110
1111
1112                 MockRaftActorContext leaderActorContext =
1113                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1114
1115                 Map<String, String> peerAddresses = new HashMap<>();
1116                 peerAddresses.put("follower-1",
1117                     followerActor.path().toString());
1118
1119                 leaderActorContext.setPeerAddresses(peerAddresses);
1120
1121                 Leader leader = new Leader(leaderActorContext);
1122
1123                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1124
1125                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1126
1127                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1128
1129             }};
1130     }
1131
1132     @Test
1133     public void testHandleAppendEntriesReplySuccess() throws Exception {
1134         new JavaTestKit(getSystem()) {
1135             {
1136
1137                 ActorRef leaderActor =
1138                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1139
1140                 ActorRef followerActor =
1141                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1142
1143
1144                 MockRaftActorContext leaderActorContext =
1145                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1146
1147                 leaderActorContext.setReplicatedLog(
1148                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1149
1150                 Map<String, String> peerAddresses = new HashMap<>();
1151                 peerAddresses.put("follower-1",
1152                     followerActor.path().toString());
1153
1154                 leaderActorContext.setPeerAddresses(peerAddresses);
1155                 leaderActorContext.setCommitIndex(1);
1156                 leaderActorContext.setLastApplied(1);
1157                 leaderActorContext.getTermInformation().update(1, "leader");
1158
1159                 Leader leader = new Leader(leaderActorContext);
1160
1161                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1162
1163                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1164
1165                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1166
1167                 assertEquals(2, leaderActorContext.getCommitIndex());
1168
1169                 ApplyLogEntries applyLogEntries =
1170                     MessageCollectorActor.getFirstMatching(leaderActor,
1171                     ApplyLogEntries.class);
1172
1173                 assertNotNull(applyLogEntries);
1174
1175                 assertEquals(2, leaderActorContext.getLastApplied());
1176
1177                 assertEquals(2, applyLogEntries.getToIndex());
1178
1179                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1180                     ApplyState.class);
1181
1182                 assertEquals(1,applyStateList.size());
1183
1184                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1185
1186                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1187
1188             }};
1189     }
1190
1191     @Test
1192     public void testHandleAppendEntriesReplyUnknownFollower(){
1193         new JavaTestKit(getSystem()) {
1194             {
1195
1196                 ActorRef leaderActor =
1197                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1198
1199                 MockRaftActorContext leaderActorContext =
1200                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1201
1202                 Leader leader = new Leader(leaderActorContext);
1203
1204                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1205
1206                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1207
1208                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1209
1210             }};
1211     }
1212
1213     @Test
1214     public void testHandleRequestVoteReply(){
1215         new JavaTestKit(getSystem()) {
1216             {
1217
1218                 ActorRef leaderActor =
1219                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1220
1221                 MockRaftActorContext leaderActorContext =
1222                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1223
1224                 Leader leader = new Leader(leaderActorContext);
1225
1226                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1227
1228                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1229
1230                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1231
1232                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1233             }};
1234     }
1235
1236     @Test
1237     public void testIsolatedLeaderCheckNoFollowers() {
1238         new JavaTestKit(getSystem()) {{
1239             ActorRef leaderActor = getTestActor();
1240
1241             MockRaftActorContext leaderActorContext =
1242                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1243
1244             Map<String, String> peerAddresses = new HashMap<>();
1245             leaderActorContext.setPeerAddresses(peerAddresses);
1246
1247             Leader leader = new Leader(leaderActorContext);
1248             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1249             Assert.assertTrue(behavior instanceof Leader);
1250         }};
1251     }
1252
1253     @Test
1254     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1255         new JavaTestKit(getSystem()) {{
1256
1257             ActorRef followerActor1 = getTestActor();
1258             ActorRef followerActor2 = getTestActor();
1259
1260             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1261
1262             Map<String, String> peerAddresses = new HashMap<>();
1263             peerAddresses.put("follower-1", followerActor1.path().toString());
1264             peerAddresses.put("follower-2", followerActor2.path().toString());
1265
1266             leaderActorContext.setPeerAddresses(peerAddresses);
1267
1268             Leader leader = new Leader(leaderActorContext);
1269             leader.stopIsolatedLeaderCheckSchedule();
1270
1271             leader.markFollowerActive("follower-1");
1272             leader.markFollowerActive("follower-2");
1273             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1274             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1275                 behavior instanceof Leader);
1276
1277             // kill 1 follower and verify if that got killed
1278             final JavaTestKit probe = new JavaTestKit(getSystem());
1279             probe.watch(followerActor1);
1280             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1281             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1282             assertEquals(termMsg1.getActor(), followerActor1);
1283
1284             leader.markFollowerInActive("follower-1");
1285             leader.markFollowerActive("follower-2");
1286             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1287             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1288                 behavior instanceof Leader);
1289
1290             // kill 2nd follower and leader should change to Isolated leader
1291             followerActor2.tell(PoisonPill.getInstance(), null);
1292             probe.watch(followerActor2);
1293             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1294             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1295             assertEquals(termMsg2.getActor(), followerActor2);
1296
1297             leader.markFollowerInActive("follower-2");
1298             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1299             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1300                 behavior instanceof IsolatedLeader);
1301
1302         }};
1303     }
1304
1305
1306     @Test
1307     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1308         new JavaTestKit(getSystem()) {{
1309             TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1310                     Props.create(MessageCollectorActor.class));
1311
1312             MockRaftActorContext leaderActorContext =
1313                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1314
1315             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1316             //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1317             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1318
1319             leaderActorContext.setConfigParams(configParams);
1320
1321             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1322                     ForwardMessageToBehaviorActor.props());
1323
1324             MockRaftActorContext followerActorContext =
1325                     new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1326
1327             followerActorContext.setConfigParams(configParams);
1328
1329             Follower follower = new Follower(followerActorContext);
1330             followerActor.underlyingActor().behavior = follower;
1331
1332             Map<String, String> peerAddresses = new HashMap<>();
1333             peerAddresses.put("follower-reply",
1334                     followerActor.path().toString());
1335
1336             leaderActorContext.setPeerAddresses(peerAddresses);
1337
1338             leaderActorContext.getReplicatedLog().removeFrom(0);
1339             leaderActorContext.setCommitIndex(-1);
1340             leaderActorContext.setLastApplied(-1);
1341
1342             followerActorContext.getReplicatedLog().removeFrom(0);
1343             followerActorContext.setCommitIndex(-1);
1344             followerActorContext.setLastApplied(-1);
1345
1346             Leader leader = new Leader(leaderActorContext);
1347
1348             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1349                     leaderActor, AppendEntriesReply.class);
1350             assertNotNull(appendEntriesReply);
1351             System.out.println("appendEntriesReply: "+appendEntriesReply);
1352             leader.handleMessage(followerActor, appendEntriesReply);
1353
1354             // Clear initial heartbeat messages
1355
1356             leaderActor.underlyingActor().clear();
1357             followerActor.underlyingActor().clear();
1358
1359             // create 3 entries
1360             leaderActorContext.setReplicatedLog(
1361                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1362             leaderActorContext.setCommitIndex(1);
1363             leaderActorContext.setLastApplied(1);
1364
1365             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1366                     TimeUnit.MILLISECONDS);
1367
1368             leader.handleMessage(leaderActor, new SendHeartBeat());
1369
1370             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1371             assertNotNull(appendEntries);
1372
1373             // Should send first log entry
1374             assertEquals(1, appendEntries.getLeaderCommit());
1375             assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1376             assertEquals(-1, appendEntries.getPrevLogIndex());
1377
1378             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1379             assertNotNull(appendEntriesReply);
1380
1381             assertEquals(1, appendEntriesReply.getLogLastTerm());
1382             assertEquals(0, appendEntriesReply.getLogLastIndex());
1383
1384             followerActor.underlyingActor().clear();
1385
1386             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1387
1388             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1389             assertNotNull(appendEntries);
1390
1391             // Should send second log entry
1392             assertEquals(1, appendEntries.getLeaderCommit());
1393             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1394         }};
1395     }
1396
1397     class MockLeader extends Leader {
1398
1399         FollowerToSnapshot fts;
1400
1401         public MockLeader(RaftActorContext context){
1402             super(context);
1403         }
1404
1405         public FollowerToSnapshot getFollowerToSnapshot() {
1406             return fts;
1407         }
1408
1409         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1410             fts = new FollowerToSnapshot(bs);
1411             setFollowerSnapshot(followerId, fts);
1412         }
1413     }
1414
1415     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1416
1417         private final long electionTimeOutIntervalMillis;
1418         private final int snapshotChunkSize;
1419
1420         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1421             super();
1422             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1423             this.snapshotChunkSize = snapshotChunkSize;
1424         }
1425
1426         @Override
1427         public FiniteDuration getElectionTimeOutInterval() {
1428             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1429         }
1430
1431         @Override
1432         public int getSnapshotChunkSize() {
1433             return snapshotChunkSize;
1434         }
1435     }
1436 }