119b958799e815d21b22c18dae11e9ff7415d928
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
43 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49
50     private final ActorRef leaderActor =
51         getSystem().actorOf(Props.create(DoNothingActor.class));
52     private final ActorRef senderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54
55     @Test
56     public void testHandleMessageForUnknownMessage() throws Exception {
57         new JavaTestKit(getSystem()) {{
58             Leader leader =
59                 new Leader(createActorContext());
60
61             // handle message should return the Leader state when it receives an
62             // unknown message
63             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
64             Assert.assertTrue(behavior instanceof Leader);
65         }};
66     }
67
68     @Test
69     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
70         new JavaTestKit(getSystem()) {{
71             new Within(duration("1 seconds")) {
72                 @Override
73                 protected void run() {
74                     ActorRef followerActor = getTestActor();
75
76                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
77
78                     Map<String, String> peerAddresses = new HashMap<>();
79
80                     String followerId = "follower";
81                     peerAddresses.put(followerId, followerActor.path().toString());
82
83                     actorContext.setPeerAddresses(peerAddresses);
84
85                     long term = 1;
86                     actorContext.getTermInformation().update(term, "");
87
88                     Leader leader = new Leader(actorContext);
89
90                     // Leader should send an immediate heartbeat with no entries as follower is inactive.
91                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
92                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
93                     assertEquals("getTerm", term, appendEntries.getTerm());
94                     assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
95                     assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
96                     assertEquals("Entries size", 0, appendEntries.getEntries().size());
97
98                     // The follower would normally reply - simulate that explicitly here.
99                     leader.handleMessage(followerActor, new AppendEntriesReply(
100                             followerId, term, true, lastIndex - 1, term));
101                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
102
103                     // Sleep for the heartbeat interval so AppendEntries is sent.
104                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
105                             getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
106
107                     leader.handleMessage(senderActor, new SendHeartBeat());
108
109                     appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
110                     assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
111                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
112                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
113                     assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
114                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
115                 }
116             };
117         }};
118     }
119
120     @Test
121     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
122         new JavaTestKit(getSystem()) {{
123             new Within(duration("1 seconds")) {
124                 @Override
125                 protected void run() {
126                     ActorRef followerActor = getTestActor();
127
128                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
129
130                     Map<String, String> peerAddresses = new HashMap<>();
131
132                     String followerId = "follower";
133                     peerAddresses.put(followerId, followerActor.path().toString());
134
135                     actorContext.setPeerAddresses(peerAddresses);
136
137                     long term = 1;
138                     actorContext.getTermInformation().update(term, "");
139
140                     Leader leader = new Leader(actorContext);
141
142                     // Leader will send an immediate heartbeat - ignore it.
143                     expectMsgClass(duration("5 seconds"), AppendEntries.class);
144
145                     // The follower would normally reply - simulate that explicitly here.
146                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
147                     leader.handleMessage(followerActor, new AppendEntriesReply(
148                             followerId, term, true, lastIndex, term));
149                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
150
151                     MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
152                     MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
153                             1, lastIndex + 1, payload);
154                     actorContext.getReplicatedLog().append(newEntry);
155                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
156                             new Replicate(null, null, newEntry));
157
158                     // State should not change
159                     assertTrue(raftBehavior instanceof Leader);
160
161                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
162                     assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
163                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
164                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
165                     assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
166                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
167                     assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
168                 }
169             };
170         }};
171     }
172
173     @Test
174     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
175         new JavaTestKit(getSystem()) {{
176             new Within(duration("1 seconds")) {
177                 @Override
178                 protected void run() {
179
180                     ActorRef raftActor = getTestActor();
181
182                     MockRaftActorContext actorContext =
183                         new MockRaftActorContext("test", getSystem(), raftActor);
184
185                     actorContext.getReplicatedLog().removeFrom(0);
186
187                     actorContext.setReplicatedLog(
188                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
189                             .build());
190
191                     Leader leader = new Leader(actorContext);
192                     RaftActorBehavior raftBehavior = leader
193                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
194
195                     // State should not change
196                     assertTrue(raftBehavior instanceof Leader);
197
198                     assertEquals(1, actorContext.getCommitIndex());
199
200                     final String out =
201                         new ExpectMsg<String>(duration("1 seconds"),
202                             "match hint") {
203                             // do not put code outside this method, will run afterwards
204                             @Override
205                             protected String match(Object in) {
206                                 if (in instanceof ApplyState) {
207                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
208                                         return "match";
209                                     }
210                                     return null;
211                                 } else {
212                                     throw noMatch();
213                                 }
214                             }
215                         }.get(); // this extracts the received message
216
217                     assertEquals("match", out);
218
219                 }
220             };
221         }};
222     }
223
224     @Test
225     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
226         new JavaTestKit(getSystem()) {{
227             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
228
229             Map<String, String> peerAddresses = new HashMap<>();
230             peerAddresses.put(followerActor.path().toString(),
231                 followerActor.path().toString());
232
233             MockRaftActorContext actorContext =
234                 (MockRaftActorContext) createActorContext(leaderActor);
235             actorContext.setPeerAddresses(peerAddresses);
236
237             Map<String, String> leadersSnapshot = new HashMap<>();
238             leadersSnapshot.put("1", "A");
239             leadersSnapshot.put("2", "B");
240             leadersSnapshot.put("3", "C");
241
242             //clears leaders log
243             actorContext.getReplicatedLog().removeFrom(0);
244
245             final int followersLastIndex = 2;
246             final int snapshotIndex = 3;
247             final int newEntryIndex = 4;
248             final int snapshotTerm = 1;
249             final int currentTerm = 2;
250
251             // set the snapshot variables in replicatedlog
252             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
253             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
254             actorContext.setCommitIndex(followersLastIndex);
255             //set follower timeout to 2 mins, helps during debugging
256             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
257
258             MockLeader leader = new MockLeader(actorContext);
259
260             // new entry
261             ReplicatedLogImplEntry entry =
262                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
263                     new MockRaftActorContext.MockPayload("D"));
264
265             //update follower timestamp
266             leader.markFollowerActive(followerActor.path().toString());
267
268             ByteString bs = toByteString(leadersSnapshot);
269             leader.setSnapshot(Optional.of(bs));
270             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
271
272             //send first chunk and no InstallSnapshotReply received yet
273             leader.getFollowerToSnapshot().getNextChunk();
274             leader.getFollowerToSnapshot().incrementChunkIndex();
275
276             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
277                 TimeUnit.MILLISECONDS);
278
279             leader.handleMessage(leaderActor, new SendHeartBeat());
280
281             AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
282                 followerActor, AppendEntries.class);
283
284             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
285                 "received", aeproto);
286
287             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
288
289             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
290
291             //InstallSnapshotReply received
292             leader.getFollowerToSnapshot().markSendStatus(true);
293
294             leader.handleMessage(senderActor, new SendHeartBeat());
295
296             InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
297                 InstallSnapshot.SERIALIZABLE_CLASS);
298
299             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
300                 isproto);
301
302             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
303
304             assertEquals(snapshotIndex, is.getLastIncludedIndex());
305
306         }};
307     }
308
309     @Test
310     public void testSendAppendEntriesSnapshotScenario() {
311         new JavaTestKit(getSystem()) {{
312
313             ActorRef followerActor = getTestActor();
314
315             Map<String, String> peerAddresses = new HashMap<>();
316             peerAddresses.put(followerActor.path().toString(),
317                 followerActor.path().toString());
318
319             MockRaftActorContext actorContext =
320                 (MockRaftActorContext) createActorContext(getRef());
321             actorContext.setPeerAddresses(peerAddresses);
322
323             Map<String, String> leadersSnapshot = new HashMap<>();
324             leadersSnapshot.put("1", "A");
325             leadersSnapshot.put("2", "B");
326             leadersSnapshot.put("3", "C");
327
328             //clears leaders log
329             actorContext.getReplicatedLog().removeFrom(0);
330
331             final int followersLastIndex = 2;
332             final int snapshotIndex = 3;
333             final int newEntryIndex = 4;
334             final int snapshotTerm = 1;
335             final int currentTerm = 2;
336
337             // set the snapshot variables in replicatedlog
338             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
339             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
340             actorContext.setCommitIndex(followersLastIndex);
341
342             Leader leader = new Leader(actorContext);
343
344             // new entry
345             ReplicatedLogImplEntry entry =
346                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
347                     new MockRaftActorContext.MockPayload("D"));
348
349             //update follower timestamp
350             leader.markFollowerActive(followerActor.path().toString());
351
352             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
353                 TimeUnit.MILLISECONDS);
354
355             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
356             RaftActorBehavior raftBehavior = leader.handleMessage(
357                 senderActor, new Replicate(null, "state-id", entry));
358
359             assertTrue(raftBehavior instanceof Leader);
360
361             // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
362             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
363                 @Override
364                 protected Boolean match(Object o) throws Exception {
365                     if (o instanceof CaptureSnapshot) {
366                         return true;
367                     }
368                     return false;
369                 }
370             }.get();
371
372             boolean captureSnapshot = false;
373             for (Boolean b: matches) {
374                 captureSnapshot = b | captureSnapshot;
375             }
376
377             assertTrue(captureSnapshot);
378         }};
379     }
380
381     @Test
382     public void testInitiateInstallSnapshot() throws Exception {
383         new JavaTestKit(getSystem()) {{
384
385             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
386
387             ActorRef followerActor = getTestActor();
388
389             Map<String, String> peerAddresses = new HashMap<>();
390             peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
391
392             MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
393             actorContext.setPeerAddresses(peerAddresses);
394
395             Map<String, String> leadersSnapshot = new HashMap<>();
396             leadersSnapshot.put("1", "A");
397             leadersSnapshot.put("2", "B");
398             leadersSnapshot.put("3", "C");
399
400             //clears leaders log
401             actorContext.getReplicatedLog().removeFrom(0);
402
403             final int followersLastIndex = 2;
404             final int snapshotIndex = 3;
405             final int newEntryIndex = 4;
406             final int snapshotTerm = 1;
407             final int currentTerm = 2;
408
409             // set the snapshot variables in replicatedlog
410             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
411             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
412             actorContext.setLastApplied(3);
413             actorContext.setCommitIndex(followersLastIndex);
414
415             Leader leader = new Leader(actorContext);
416             // set the snapshot as absent and check if capture-snapshot is invoked.
417             leader.setSnapshot(Optional.<ByteString>absent());
418
419             // new entry
420             ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
421                     new MockRaftActorContext.MockPayload("D"));
422
423             actorContext.getReplicatedLog().append(entry);
424
425             //update follower timestamp
426             leader.markFollowerActive(followerActor.path().toString());
427
428             RaftActorBehavior raftBehavior = leader.handleMessage(
429                     senderActor, new Replicate(null, "state-id", entry));
430
431             CaptureSnapshot cs = MessageCollectorActor.
432                 getFirstMatching(leaderActor, CaptureSnapshot.class);
433
434             assertNotNull(cs);
435
436             assertTrue(cs.isInstallSnapshotInitiated());
437             assertEquals(3, cs.getLastAppliedIndex());
438             assertEquals(1, cs.getLastAppliedTerm());
439             assertEquals(4, cs.getLastIndex());
440             assertEquals(2, cs.getLastTerm());
441
442             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
443             leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
444             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
445             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
446
447         }};
448     }
449
450     @Test
451     public void testInstallSnapshot() {
452         new JavaTestKit(getSystem()) {{
453
454             ActorRef followerActor = getTestActor();
455
456             Map<String, String> peerAddresses = new HashMap<>();
457             peerAddresses.put(followerActor.path().toString(),
458                 followerActor.path().toString());
459
460             MockRaftActorContext actorContext =
461                 (MockRaftActorContext) createActorContext();
462             actorContext.setPeerAddresses(peerAddresses);
463
464
465             Map<String, String> leadersSnapshot = new HashMap<>();
466             leadersSnapshot.put("1", "A");
467             leadersSnapshot.put("2", "B");
468             leadersSnapshot.put("3", "C");
469
470             //clears leaders log
471             actorContext.getReplicatedLog().removeFrom(0);
472
473             final int followersLastIndex = 2;
474             final int snapshotIndex = 3;
475             final int newEntryIndex = 4;
476             final int snapshotTerm = 1;
477             final int currentTerm = 2;
478
479             // set the snapshot variables in replicatedlog
480             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483             actorContext.setCommitIndex(followersLastIndex);
484
485             Leader leader = new Leader(actorContext);
486
487             // Ignore initial heartbeat.
488             expectMsgClass(duration("5 seconds"), AppendEntries.class);
489
490             // new entry
491             ReplicatedLogImplEntry entry =
492                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
493                     new MockRaftActorContext.MockPayload("D"));
494
495             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
496                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
497
498             assertTrue(raftBehavior instanceof Leader);
499
500             // check if installsnapshot gets called with the correct values.
501             final String out =
502                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
503                     // do not put code outside this method, will run afterwards
504                     @Override
505                     protected String match(Object in) {
506                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
507                             InstallSnapshot is = (InstallSnapshot)
508                                 SerializationUtils.fromSerializable(in);
509                             if (is.getData() == null) {
510                                 return "InstallSnapshot data is null";
511                             }
512                             if (is.getLastIncludedIndex() != snapshotIndex) {
513                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
514                             }
515                             if (is.getLastIncludedTerm() != snapshotTerm) {
516                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
517                             }
518                             if (is.getTerm() == currentTerm) {
519                                 return is.getTerm() + "!=" + currentTerm;
520                             }
521
522                             return "match";
523
524                         } else {
525                             return "message mismatch:" + in.getClass();
526                         }
527                     }
528                 }.get(); // this extracts the received message
529
530             assertEquals("match", out);
531         }};
532     }
533
534     @Test
535     public void testHandleInstallSnapshotReplyLastChunk() {
536         new JavaTestKit(getSystem()) {{
537
538             ActorRef followerActor = getTestActor();
539
540             Map<String, String> peerAddresses = new HashMap<>();
541             peerAddresses.put(followerActor.path().toString(),
542                 followerActor.path().toString());
543
544             final int followersLastIndex = 2;
545             final int snapshotIndex = 3;
546             final int newEntryIndex = 4;
547             final int snapshotTerm = 1;
548             final int currentTerm = 2;
549
550             MockRaftActorContext actorContext =
551                 (MockRaftActorContext) createActorContext();
552             actorContext.setPeerAddresses(peerAddresses);
553             actorContext.setCommitIndex(followersLastIndex);
554
555             MockLeader leader = new MockLeader(actorContext);
556
557             // Ignore initial heartbeat.
558             expectMsgClass(duration("5 seconds"), AppendEntries.class);
559
560             Map<String, String> leadersSnapshot = new HashMap<>();
561             leadersSnapshot.put("1", "A");
562             leadersSnapshot.put("2", "B");
563             leadersSnapshot.put("3", "C");
564
565             // set the snapshot variables in replicatedlog
566
567             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
570
571             ByteString bs = toByteString(leadersSnapshot);
572             leader.setSnapshot(Optional.of(bs));
573             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
574             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
575                 leader.getFollowerToSnapshot().getNextChunk();
576                 leader.getFollowerToSnapshot().incrementChunkIndex();
577             }
578
579             //clears leaders log
580             actorContext.getReplicatedLog().removeFrom(0);
581
582             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
583                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
584                     leader.getFollowerToSnapshot().getChunkIndex(), true));
585
586             assertTrue(raftBehavior instanceof Leader);
587
588             assertEquals(0, leader.followerSnapshotSize());
589             assertEquals(1, leader.followerLogSize());
590             assertNotNull(leader.getFollower(followerActor.path().toString()));
591             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
592             assertEquals(snapshotIndex, fli.getMatchIndex());
593             assertEquals(snapshotIndex, fli.getMatchIndex());
594             assertEquals(snapshotIndex + 1, fli.getNextIndex());
595         }};
596     }
597     @Test
598     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
599         new JavaTestKit(getSystem()) {{
600
601             TestActorRef<MessageCollectorActor> followerActor =
602                 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
603
604             Map<String, String> peerAddresses = new HashMap<>();
605             peerAddresses.put("follower-reply",
606                 followerActor.path().toString());
607
608             final int followersLastIndex = 2;
609             final int snapshotIndex = 3;
610             final int snapshotTerm = 1;
611             final int currentTerm = 2;
612
613             MockRaftActorContext actorContext =
614                 (MockRaftActorContext) createActorContext();
615             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
616                 @Override
617                 public int getSnapshotChunkSize() {
618                     return 50;
619                 }
620             };
621             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
622             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
623
624             actorContext.setConfigParams(configParams);
625             actorContext.setPeerAddresses(peerAddresses);
626             actorContext.setCommitIndex(followersLastIndex);
627
628             MockLeader leader = new MockLeader(actorContext);
629
630             Map<String, String> leadersSnapshot = new HashMap<>();
631             leadersSnapshot.put("1", "A");
632             leadersSnapshot.put("2", "B");
633             leadersSnapshot.put("3", "C");
634
635             // set the snapshot variables in replicatedlog
636             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
639
640             ByteString bs = toByteString(leadersSnapshot);
641             leader.setSnapshot(Optional.of(bs));
642
643             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
644
645             List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
646                 InstallSnapshotMessages.InstallSnapshot.class);
647
648             assertEquals(1, objectList.size());
649
650             Object o = objectList.get(0);
651             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
652
653             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
654
655             assertEquals(1, installSnapshot.getChunkIndex());
656             assertEquals(3, installSnapshot.getTotalChunks());
657
658             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
659                 "follower-reply", installSnapshot.getChunkIndex(), true));
660
661             objectList = MessageCollectorActor.getAllMatching(followerActor,
662                 InstallSnapshotMessages.InstallSnapshot.class);
663
664             assertEquals(2, objectList.size());
665
666             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
667
668             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
669                 "follower-reply", installSnapshot.getChunkIndex(), true));
670
671             objectList = MessageCollectorActor.getAllMatching(followerActor,
672                 InstallSnapshotMessages.InstallSnapshot.class);
673
674             assertEquals(3, objectList.size());
675
676             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
677
678             // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
679             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
680                 "follower-reply", installSnapshot.getChunkIndex(), true));
681
682             objectList = MessageCollectorActor.getAllMatching(followerActor,
683                 InstallSnapshotMessages.InstallSnapshot.class);
684
685             // Count should still stay at 3
686             assertEquals(3, objectList.size());
687         }};
688     }
689
690
691     @Test
692     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
693         new JavaTestKit(getSystem()) {{
694
695             TestActorRef<MessageCollectorActor> followerActor =
696                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
697
698             Map<String, String> peerAddresses = new HashMap<>();
699             peerAddresses.put(followerActor.path().toString(),
700                     followerActor.path().toString());
701
702             final int followersLastIndex = 2;
703             final int snapshotIndex = 3;
704             final int snapshotTerm = 1;
705             final int currentTerm = 2;
706
707             MockRaftActorContext actorContext =
708                     (MockRaftActorContext) createActorContext();
709
710             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
711                 @Override
712                 public int getSnapshotChunkSize() {
713                     return 50;
714                 }
715             });
716             actorContext.setPeerAddresses(peerAddresses);
717             actorContext.setCommitIndex(followersLastIndex);
718
719             MockLeader leader = new MockLeader(actorContext);
720
721             Map<String, String> leadersSnapshot = new HashMap<>();
722             leadersSnapshot.put("1", "A");
723             leadersSnapshot.put("2", "B");
724             leadersSnapshot.put("3", "C");
725
726             // set the snapshot variables in replicatedlog
727             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
728             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
729             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
730
731             ByteString bs = toByteString(leadersSnapshot);
732             leader.setSnapshot(Optional.of(bs));
733
734             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
735
736             MessageCollectorActor.getAllMatching(followerActor,
737                     InstallSnapshotMessages.InstallSnapshot.class);
738
739             InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
740                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
741             assertNotNull(installSnapshot);
742
743             assertEquals(1, installSnapshot.getChunkIndex());
744             assertEquals(3, installSnapshot.getTotalChunks());
745
746             followerActor.underlyingActor().clear();
747
748             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
749                 followerActor.path().toString(), -1, false));
750
751             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
752                 TimeUnit.MILLISECONDS);
753
754             leader.handleMessage(leaderActor, new SendHeartBeat());
755
756             installSnapshot = MessageCollectorActor.getFirstMatching(
757                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
758             assertNotNull(installSnapshot);
759
760             assertEquals(1, installSnapshot.getChunkIndex());
761             assertEquals(3, installSnapshot.getTotalChunks());
762
763             followerActor.tell(PoisonPill.getInstance(), getRef());
764         }};
765     }
766
767     @Test
768     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
769         new JavaTestKit(getSystem()) {
770             {
771                 TestActorRef<MessageCollectorActor> followerActor =
772                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
773
774                 Map<String, String> peerAddresses = new HashMap<>();
775                 peerAddresses.put(followerActor.path().toString(),
776                         followerActor.path().toString());
777
778                 final int followersLastIndex = 2;
779                 final int snapshotIndex = 3;
780                 final int snapshotTerm = 1;
781                 final int currentTerm = 2;
782
783                 MockRaftActorContext actorContext =
784                         (MockRaftActorContext) createActorContext();
785
786                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
787                     @Override
788                     public int getSnapshotChunkSize() {
789                         return 50;
790                     }
791                 });
792                 actorContext.setPeerAddresses(peerAddresses);
793                 actorContext.setCommitIndex(followersLastIndex);
794
795                 MockLeader leader = new MockLeader(actorContext);
796
797                 Map<String, String> leadersSnapshot = new HashMap<>();
798                 leadersSnapshot.put("1", "A");
799                 leadersSnapshot.put("2", "B");
800                 leadersSnapshot.put("3", "C");
801
802                 // set the snapshot variables in replicatedlog
803                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806
807                 ByteString bs = toByteString(leadersSnapshot);
808                 leader.setSnapshot(Optional.of(bs));
809
810                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
811
812                 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
813                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
814                 assertNotNull(installSnapshot);
815
816                 assertEquals(1, installSnapshot.getChunkIndex());
817                 assertEquals(3, installSnapshot.getTotalChunks());
818                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
819
820                 int hashCode = installSnapshot.getData().hashCode();
821
822                 followerActor.underlyingActor().clear();
823
824                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
825
826                 installSnapshot = MessageCollectorActor.getFirstMatching(
827                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
828                 assertNotNull(installSnapshot);
829
830                 assertEquals(2, installSnapshot.getChunkIndex());
831                 assertEquals(3, installSnapshot.getTotalChunks());
832                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
833
834                 followerActor.tell(PoisonPill.getInstance(), getRef());
835             }};
836     }
837
838     @Test
839     public void testFollowerToSnapshotLogic() {
840
841         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
842
843         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
844             @Override
845             public int getSnapshotChunkSize() {
846                 return 50;
847             }
848         });
849
850         MockLeader leader = new MockLeader(actorContext);
851
852         Map<String, String> leadersSnapshot = new HashMap<>();
853         leadersSnapshot.put("1", "A");
854         leadersSnapshot.put("2", "B");
855         leadersSnapshot.put("3", "C");
856
857         ByteString bs = toByteString(leadersSnapshot);
858         byte[] barray = bs.toByteArray();
859
860         leader.createFollowerToSnapshot("followerId", bs);
861         assertEquals(bs.size(), barray.length);
862
863         int chunkIndex=0;
864         for (int i=0; i < barray.length; i = i + 50) {
865             int j = i + 50;
866             chunkIndex++;
867
868             if (i + 50 > barray.length) {
869                 j = barray.length;
870             }
871
872             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
873             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
874             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
875
876             leader.getFollowerToSnapshot().markSendStatus(true);
877             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
878                 leader.getFollowerToSnapshot().incrementChunkIndex();
879             }
880         }
881
882         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
883     }
884
885
886     @Override protected RaftActorBehavior createBehavior(
887         RaftActorContext actorContext) {
888         return new Leader(actorContext);
889     }
890
891     @Override protected RaftActorContext createActorContext() {
892         return createActorContext(leaderActor);
893     }
894
895     @Override
896     protected RaftActorContext createActorContext(ActorRef actorRef) {
897         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
898         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
899         configParams.setElectionTimeoutFactor(100000);
900         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
901         context.setConfigParams(configParams);
902         return context;
903     }
904
905     private ByteString toByteString(Map<String, String> state) {
906         ByteArrayOutputStream b = null;
907         ObjectOutputStream o = null;
908         try {
909             try {
910                 b = new ByteArrayOutputStream();
911                 o = new ObjectOutputStream(b);
912                 o.writeObject(state);
913                 byte[] snapshotBytes = b.toByteArray();
914                 return ByteString.copyFrom(snapshotBytes);
915             } finally {
916                 if (o != null) {
917                     o.flush();
918                     o.close();
919                 }
920                 if (b != null) {
921                     b.close();
922                 }
923             }
924         } catch (IOException e) {
925             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
926         }
927         return null;
928     }
929
930     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
931         AbstractRaftActorBehavior behavior;
932
933         @Override public void onReceive(Object message) throws Exception {
934             if(behavior != null) {
935                 behavior.handleMessage(sender(), message);
936             }
937
938             super.onReceive(message);
939         }
940
941         public static Props props() {
942             return Props.create(ForwardMessageToBehaviorActor.class);
943         }
944     }
945
946     @Test
947     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
948         new JavaTestKit(getSystem()) {{
949             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
950                     Props.create(ForwardMessageToBehaviorActor.class));
951
952             MockRaftActorContext leaderActorContext =
953                     new MockRaftActorContext("leader", getSystem(), leaderActor);
954
955             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
956                     ForwardMessageToBehaviorActor.props());
957
958             MockRaftActorContext followerActorContext =
959                     new MockRaftActorContext("follower", getSystem(), followerActor);
960
961             Follower follower = new Follower(followerActorContext);
962             followerActor.underlyingActor().behavior = follower;
963
964             Map<String, String> peerAddresses = new HashMap<>();
965             peerAddresses.put("follower", followerActor.path().toString());
966
967             leaderActorContext.setPeerAddresses(peerAddresses);
968
969             leaderActorContext.getReplicatedLog().removeFrom(0);
970
971             //create 3 entries
972             leaderActorContext.setReplicatedLog(
973                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
974
975             leaderActorContext.setCommitIndex(1);
976
977             followerActorContext.getReplicatedLog().removeFrom(0);
978
979             // follower too has the exact same log entries and has the same commit index
980             followerActorContext.setReplicatedLog(
981                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
982
983             followerActorContext.setCommitIndex(1);
984
985             Leader leader = new Leader(leaderActorContext);
986
987             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
988             assertNotNull(appendEntries);
989
990             assertEquals(1, appendEntries.getLeaderCommit());
991             assertEquals(0, appendEntries.getEntries().size());
992             assertEquals(0, appendEntries.getPrevLogIndex());
993
994             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
995                     leaderActor, AppendEntriesReply.class);
996             assertNotNull(appendEntriesReply);
997
998             assertEquals(2, appendEntriesReply.getLogLastIndex());
999             assertEquals(1, appendEntriesReply.getLogLastTerm());
1000
1001             // follower returns its next index
1002             assertEquals(2, appendEntriesReply.getLogLastIndex());
1003             assertEquals(1, appendEntriesReply.getLogLastTerm());
1004         }};
1005     }
1006
1007
1008     @Test
1009     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1010         new JavaTestKit(getSystem()) {{
1011             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1012                     Props.create(ForwardMessageToBehaviorActor.class));
1013
1014             MockRaftActorContext leaderActorContext =
1015                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1016
1017             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1018                     ForwardMessageToBehaviorActor.props());
1019
1020             MockRaftActorContext followerActorContext =
1021                     new MockRaftActorContext("follower", getSystem(), followerActor);
1022
1023             Follower follower = new Follower(followerActorContext);
1024             followerActor.underlyingActor().behavior = follower;
1025
1026             Map<String, String> peerAddresses = new HashMap<>();
1027             peerAddresses.put("follower", followerActor.path().toString());
1028
1029             leaderActorContext.setPeerAddresses(peerAddresses);
1030
1031             leaderActorContext.getReplicatedLog().removeFrom(0);
1032
1033             leaderActorContext.setReplicatedLog(
1034                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1035
1036             leaderActorContext.setCommitIndex(1);
1037
1038             followerActorContext.getReplicatedLog().removeFrom(0);
1039
1040             followerActorContext.setReplicatedLog(
1041                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1042
1043             // follower has the same log entries but its commit index > leaders commit index
1044             followerActorContext.setCommitIndex(2);
1045
1046             Leader leader = new Leader(leaderActorContext);
1047
1048             // Initial heartbeat
1049             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1050             assertNotNull(appendEntries);
1051
1052             assertEquals(1, appendEntries.getLeaderCommit());
1053             assertEquals(0, appendEntries.getEntries().size());
1054             assertEquals(0, appendEntries.getPrevLogIndex());
1055
1056             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1057                     leaderActor, AppendEntriesReply.class);
1058             assertNotNull(appendEntriesReply);
1059
1060             assertEquals(2, appendEntriesReply.getLogLastIndex());
1061             assertEquals(1, appendEntriesReply.getLogLastTerm());
1062
1063             leaderActor.underlyingActor().behavior = leader;
1064             leader.handleMessage(followerActor, appendEntriesReply);
1065
1066             leaderActor.underlyingActor().clear();
1067             followerActor.underlyingActor().clear();
1068
1069             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1070                     TimeUnit.MILLISECONDS);
1071
1072             leader.handleMessage(leaderActor, new SendHeartBeat());
1073
1074             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1075             assertNotNull(appendEntries);
1076
1077             assertEquals(1, appendEntries.getLeaderCommit());
1078             assertEquals(0, appendEntries.getEntries().size());
1079             assertEquals(2, appendEntries.getPrevLogIndex());
1080
1081             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1082             assertNotNull(appendEntriesReply);
1083
1084             assertEquals(2, appendEntriesReply.getLogLastIndex());
1085             assertEquals(1, appendEntriesReply.getLogLastTerm());
1086
1087             assertEquals(1, followerActorContext.getCommitIndex());
1088         }};
1089     }
1090
1091     @Test
1092     public void testHandleAppendEntriesReplyFailure(){
1093         new JavaTestKit(getSystem()) {
1094             {
1095
1096                 ActorRef leaderActor =
1097                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1098
1099                 ActorRef followerActor =
1100                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1101
1102
1103                 MockRaftActorContext leaderActorContext =
1104                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1105
1106                 Map<String, String> peerAddresses = new HashMap<>();
1107                 peerAddresses.put("follower-1",
1108                     followerActor.path().toString());
1109
1110                 leaderActorContext.setPeerAddresses(peerAddresses);
1111
1112                 Leader leader = new Leader(leaderActorContext);
1113
1114                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1115
1116                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1117
1118                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1119
1120             }};
1121     }
1122
1123     @Test
1124     public void testHandleAppendEntriesReplySuccess() throws Exception {
1125         new JavaTestKit(getSystem()) {
1126             {
1127
1128                 ActorRef leaderActor =
1129                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1130
1131                 ActorRef followerActor =
1132                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1133
1134
1135                 MockRaftActorContext leaderActorContext =
1136                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1137
1138                 leaderActorContext.setReplicatedLog(
1139                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1140
1141                 Map<String, String> peerAddresses = new HashMap<>();
1142                 peerAddresses.put("follower-1",
1143                     followerActor.path().toString());
1144
1145                 leaderActorContext.setPeerAddresses(peerAddresses);
1146                 leaderActorContext.setCommitIndex(1);
1147                 leaderActorContext.setLastApplied(1);
1148                 leaderActorContext.getTermInformation().update(1, "leader");
1149
1150                 Leader leader = new Leader(leaderActorContext);
1151
1152                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1153
1154                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1155
1156                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1157
1158                 assertEquals(2, leaderActorContext.getCommitIndex());
1159
1160                 ApplyLogEntries applyLogEntries =
1161                     MessageCollectorActor.getFirstMatching(leaderActor,
1162                     ApplyLogEntries.class);
1163
1164                 assertNotNull(applyLogEntries);
1165
1166                 assertEquals(2, leaderActorContext.getLastApplied());
1167
1168                 assertEquals(2, applyLogEntries.getToIndex());
1169
1170                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1171                     ApplyState.class);
1172
1173                 assertEquals(1,applyStateList.size());
1174
1175                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1176
1177                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1178
1179             }};
1180     }
1181
1182     @Test
1183     public void testHandleAppendEntriesReplyUnknownFollower(){
1184         new JavaTestKit(getSystem()) {
1185             {
1186
1187                 ActorRef leaderActor =
1188                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1189
1190                 MockRaftActorContext leaderActorContext =
1191                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1192
1193                 Leader leader = new Leader(leaderActorContext);
1194
1195                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1196
1197                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1198
1199                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1200
1201             }};
1202     }
1203
1204     @Test
1205     public void testHandleRequestVoteReply(){
1206         new JavaTestKit(getSystem()) {
1207             {
1208
1209                 ActorRef leaderActor =
1210                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1211
1212                 MockRaftActorContext leaderActorContext =
1213                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1214
1215                 Leader leader = new Leader(leaderActorContext);
1216
1217                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1218
1219                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1220
1221                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1222
1223                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1224             }};
1225     }
1226
1227     @Test
1228     public void testIsolatedLeaderCheckNoFollowers() {
1229         new JavaTestKit(getSystem()) {{
1230             ActorRef leaderActor = getTestActor();
1231
1232             MockRaftActorContext leaderActorContext =
1233                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1234
1235             Map<String, String> peerAddresses = new HashMap<>();
1236             leaderActorContext.setPeerAddresses(peerAddresses);
1237
1238             Leader leader = new Leader(leaderActorContext);
1239             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1240             Assert.assertTrue(behavior instanceof Leader);
1241         }};
1242     }
1243
1244     @Test
1245     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1246         new JavaTestKit(getSystem()) {{
1247
1248             ActorRef followerActor1 = getTestActor();
1249             ActorRef followerActor2 = getTestActor();
1250
1251             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1252
1253             Map<String, String> peerAddresses = new HashMap<>();
1254             peerAddresses.put("follower-1", followerActor1.path().toString());
1255             peerAddresses.put("follower-2", followerActor2.path().toString());
1256
1257             leaderActorContext.setPeerAddresses(peerAddresses);
1258
1259             Leader leader = new Leader(leaderActorContext);
1260             leader.stopIsolatedLeaderCheckSchedule();
1261
1262             leader.markFollowerActive("follower-1");
1263             leader.markFollowerActive("follower-2");
1264             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1265             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1266                 behavior instanceof Leader);
1267
1268             // kill 1 follower and verify if that got killed
1269             final JavaTestKit probe = new JavaTestKit(getSystem());
1270             probe.watch(followerActor1);
1271             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1272             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1273             assertEquals(termMsg1.getActor(), followerActor1);
1274
1275             leader.markFollowerInActive("follower-1");
1276             leader.markFollowerActive("follower-2");
1277             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1278             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1279                 behavior instanceof Leader);
1280
1281             // kill 2nd follower and leader should change to Isolated leader
1282             followerActor2.tell(PoisonPill.getInstance(), null);
1283             probe.watch(followerActor2);
1284             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1286             assertEquals(termMsg2.getActor(), followerActor2);
1287
1288             leader.markFollowerInActive("follower-2");
1289             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1290             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1291                 behavior instanceof IsolatedLeader);
1292
1293         }};
1294     }
1295
1296
1297     @Test
1298     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1299         new JavaTestKit(getSystem()) {{
1300             TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1301                     Props.create(MessageCollectorActor.class));
1302
1303             MockRaftActorContext leaderActorContext =
1304                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1305
1306             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1307             //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1308             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1309
1310             leaderActorContext.setConfigParams(configParams);
1311
1312             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1313                     ForwardMessageToBehaviorActor.props());
1314
1315             MockRaftActorContext followerActorContext =
1316                     new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1317
1318             followerActorContext.setConfigParams(configParams);
1319
1320             Follower follower = new Follower(followerActorContext);
1321             followerActor.underlyingActor().behavior = follower;
1322
1323             Map<String, String> peerAddresses = new HashMap<>();
1324             peerAddresses.put("follower-reply",
1325                     followerActor.path().toString());
1326
1327             leaderActorContext.setPeerAddresses(peerAddresses);
1328
1329             leaderActorContext.getReplicatedLog().removeFrom(0);
1330             leaderActorContext.setCommitIndex(-1);
1331             leaderActorContext.setLastApplied(-1);
1332
1333             followerActorContext.getReplicatedLog().removeFrom(0);
1334             followerActorContext.setCommitIndex(-1);
1335             followerActorContext.setLastApplied(-1);
1336
1337             Leader leader = new Leader(leaderActorContext);
1338
1339             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1340                     leaderActor, AppendEntriesReply.class);
1341             assertNotNull(appendEntriesReply);
1342             System.out.println("appendEntriesReply: "+appendEntriesReply);
1343             leader.handleMessage(followerActor, appendEntriesReply);
1344
1345             // Clear initial heartbeat messages
1346
1347             leaderActor.underlyingActor().clear();
1348             followerActor.underlyingActor().clear();
1349
1350             // create 3 entries
1351             leaderActorContext.setReplicatedLog(
1352                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1353             leaderActorContext.setCommitIndex(1);
1354             leaderActorContext.setLastApplied(1);
1355
1356             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1357                     TimeUnit.MILLISECONDS);
1358
1359             leader.handleMessage(leaderActor, new SendHeartBeat());
1360
1361             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1362             assertNotNull(appendEntries);
1363
1364             // Should send first log entry
1365             assertEquals(1, appendEntries.getLeaderCommit());
1366             assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1367             assertEquals(-1, appendEntries.getPrevLogIndex());
1368
1369             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1370             assertNotNull(appendEntriesReply);
1371
1372             assertEquals(1, appendEntriesReply.getLogLastTerm());
1373             assertEquals(0, appendEntriesReply.getLogLastIndex());
1374
1375             followerActor.underlyingActor().clear();
1376
1377             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1378
1379             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1380             assertNotNull(appendEntries);
1381
1382             // Should send second log entry
1383             assertEquals(1, appendEntries.getLeaderCommit());
1384             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1385         }};
1386     }
1387
1388     class MockLeader extends Leader {
1389
1390         FollowerToSnapshot fts;
1391
1392         public MockLeader(RaftActorContext context){
1393             super(context);
1394         }
1395
1396         public FollowerToSnapshot getFollowerToSnapshot() {
1397             return fts;
1398         }
1399
1400         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1401             fts = new FollowerToSnapshot(bs);
1402             setFollowerSnapshot(followerId, fts);
1403         }
1404     }
1405
1406     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1407
1408         private final long electionTimeOutIntervalMillis;
1409         private final int snapshotChunkSize;
1410
1411         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1412             super();
1413             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1414             this.snapshotChunkSize = snapshotChunkSize;
1415         }
1416
1417         @Override
1418         public FiniteDuration getElectionTimeOutInterval() {
1419             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1420         }
1421
1422         @Override
1423         public int getSnapshotChunkSize() {
1424             return snapshotChunkSize;
1425         }
1426     }
1427 }