Bug 2538: Remove redundant Augmentation checks and tests
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50
51     private final ActorRef leaderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53     private final ActorRef senderActor =
54         getSystem().actorOf(Props.create(DoNothingActor.class));
55
56     @Test
57     public void testHandleMessageForUnknownMessage() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             Leader leader =
60                 new Leader(createActorContext());
61
62             // handle message should return the Leader state when it receives an
63             // unknown message
64             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65             Assert.assertTrue(behavior instanceof Leader);
66         }};
67     }
68
69     @Test
70     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71         new JavaTestKit(getSystem()) {{
72             new Within(duration("1 seconds")) {
73                 @Override
74                 protected void run() {
75                     ActorRef followerActor = getTestActor();
76
77                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
78
79                     Map<String, String> peerAddresses = new HashMap<>();
80
81                     String followerId = "follower";
82                     peerAddresses.put(followerId, followerActor.path().toString());
83
84                     actorContext.setPeerAddresses(peerAddresses);
85
86                     long term = 1;
87                     actorContext.getTermInformation().update(term, "");
88
89                     Leader leader = new Leader(actorContext);
90
91                     // Leader should send an immediate heartbeat with no entries as follower is inactive.
92                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
93                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
94                     assertEquals("getTerm", term, appendEntries.getTerm());
95                     assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
96                     assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
97                     assertEquals("Entries size", 0, appendEntries.getEntries().size());
98
99                     // The follower would normally reply - simulate that explicitly here.
100                     leader.handleMessage(followerActor, new AppendEntriesReply(
101                             followerId, term, true, lastIndex - 1, term));
102                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
103
104                     // Sleep for the heartbeat interval so AppendEntries is sent.
105                     Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
106                             getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
107
108                     leader.handleMessage(senderActor, new SendHeartBeat());
109
110                     appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
111                     assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
112                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
113                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
114                     assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
115                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
116                 }
117             };
118         }};
119     }
120
121     @Test
122     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
123         new JavaTestKit(getSystem()) {{
124             new Within(duration("1 seconds")) {
125                 @Override
126                 protected void run() {
127                     ActorRef followerActor = getTestActor();
128
129                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
130
131                     Map<String, String> peerAddresses = new HashMap<>();
132
133                     String followerId = "follower";
134                     peerAddresses.put(followerId, followerActor.path().toString());
135
136                     actorContext.setPeerAddresses(peerAddresses);
137
138                     long term = 1;
139                     actorContext.getTermInformation().update(term, "");
140
141                     Leader leader = new Leader(actorContext);
142
143                     // Leader will send an immediate heartbeat - ignore it.
144                     expectMsgClass(duration("5 seconds"), AppendEntries.class);
145
146                     // The follower would normally reply - simulate that explicitly here.
147                     long lastIndex = actorContext.getReplicatedLog().lastIndex();
148                     leader.handleMessage(followerActor, new AppendEntriesReply(
149                             followerId, term, true, lastIndex, term));
150                     assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
151
152                     MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
153                     MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
154                             1, lastIndex + 1, payload);
155                     actorContext.getReplicatedLog().append(newEntry);
156                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
157                             new Replicate(null, null, newEntry));
158
159                     // State should not change
160                     assertTrue(raftBehavior instanceof Leader);
161
162                     AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
163                     assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
164                     assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
165                     assertEquals("Entries size", 1, appendEntries.getEntries().size());
166                     assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
167                     assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
168                     assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
169                 }
170             };
171         }};
172     }
173
174     @Test
175     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
176         new JavaTestKit(getSystem()) {{
177             new Within(duration("1 seconds")) {
178                 @Override
179                 protected void run() {
180
181                     ActorRef raftActor = getTestActor();
182
183                     MockRaftActorContext actorContext =
184                         new MockRaftActorContext("test", getSystem(), raftActor);
185
186                     actorContext.getReplicatedLog().removeFrom(0);
187
188                     actorContext.setReplicatedLog(
189                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
190                             .build());
191
192                     Leader leader = new Leader(actorContext);
193                     RaftActorBehavior raftBehavior = leader
194                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
195
196                     // State should not change
197                     assertTrue(raftBehavior instanceof Leader);
198
199                     assertEquals(1, actorContext.getCommitIndex());
200
201                     final String out =
202                         new ExpectMsg<String>(duration("1 seconds"),
203                             "match hint") {
204                             // do not put code outside this method, will run afterwards
205                             @Override
206                             protected String match(Object in) {
207                                 if (in instanceof ApplyState) {
208                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
209                                         return "match";
210                                     }
211                                     return null;
212                                 } else {
213                                     throw noMatch();
214                                 }
215                             }
216                         }.get(); // this extracts the received message
217
218                     assertEquals("match", out);
219
220                 }
221             };
222         }};
223     }
224
225     @Test
226     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
227         new JavaTestKit(getSystem()) {{
228             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
229
230             Map<String, String> peerAddresses = new HashMap<>();
231             peerAddresses.put(followerActor.path().toString(),
232                 followerActor.path().toString());
233
234             MockRaftActorContext actorContext =
235                 (MockRaftActorContext) createActorContext(leaderActor);
236             actorContext.setPeerAddresses(peerAddresses);
237
238             Map<String, String> leadersSnapshot = new HashMap<>();
239             leadersSnapshot.put("1", "A");
240             leadersSnapshot.put("2", "B");
241             leadersSnapshot.put("3", "C");
242
243             //clears leaders log
244             actorContext.getReplicatedLog().removeFrom(0);
245
246             final int followersLastIndex = 2;
247             final int snapshotIndex = 3;
248             final int newEntryIndex = 4;
249             final int snapshotTerm = 1;
250             final int currentTerm = 2;
251
252             // set the snapshot variables in replicatedlog
253             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
254             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
255             actorContext.setCommitIndex(followersLastIndex);
256             //set follower timeout to 2 mins, helps during debugging
257             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
258
259             MockLeader leader = new MockLeader(actorContext);
260
261             // new entry
262             ReplicatedLogImplEntry entry =
263                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
264                     new MockRaftActorContext.MockPayload("D"));
265
266             //update follower timestamp
267             leader.markFollowerActive(followerActor.path().toString());
268
269             ByteString bs = toByteString(leadersSnapshot);
270             leader.setSnapshot(Optional.of(bs));
271             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
272
273             //send first chunk and no InstallSnapshotReply received yet
274             leader.getFollowerToSnapshot().getNextChunk();
275             leader.getFollowerToSnapshot().incrementChunkIndex();
276
277             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
278                 TimeUnit.MILLISECONDS);
279
280             leader.handleMessage(leaderActor, new SendHeartBeat());
281
282             AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
283                 followerActor, AppendEntries.class);
284
285             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
286                 "received", aeproto);
287
288             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
289
290             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
291
292             //InstallSnapshotReply received
293             leader.getFollowerToSnapshot().markSendStatus(true);
294
295             leader.handleMessage(senderActor, new SendHeartBeat());
296
297             InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
298                 InstallSnapshot.SERIALIZABLE_CLASS);
299
300             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
301                 isproto);
302
303             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
304
305             assertEquals(snapshotIndex, is.getLastIncludedIndex());
306
307         }};
308     }
309
310     @Test
311     public void testSendAppendEntriesSnapshotScenario() {
312         new JavaTestKit(getSystem()) {{
313
314             ActorRef followerActor = getTestActor();
315
316             Map<String, String> peerAddresses = new HashMap<>();
317             peerAddresses.put(followerActor.path().toString(),
318                 followerActor.path().toString());
319
320             MockRaftActorContext actorContext =
321                 (MockRaftActorContext) createActorContext(getRef());
322             actorContext.setPeerAddresses(peerAddresses);
323
324             Map<String, String> leadersSnapshot = new HashMap<>();
325             leadersSnapshot.put("1", "A");
326             leadersSnapshot.put("2", "B");
327             leadersSnapshot.put("3", "C");
328
329             //clears leaders log
330             actorContext.getReplicatedLog().removeFrom(0);
331
332             final int followersLastIndex = 2;
333             final int snapshotIndex = 3;
334             final int newEntryIndex = 4;
335             final int snapshotTerm = 1;
336             final int currentTerm = 2;
337
338             // set the snapshot variables in replicatedlog
339             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
340             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
341             actorContext.setCommitIndex(followersLastIndex);
342
343             Leader leader = new Leader(actorContext);
344
345             // new entry
346             ReplicatedLogImplEntry entry =
347                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
348                     new MockRaftActorContext.MockPayload("D"));
349
350             //update follower timestamp
351             leader.markFollowerActive(followerActor.path().toString());
352
353             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
354                 TimeUnit.MILLISECONDS);
355
356             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
357             RaftActorBehavior raftBehavior = leader.handleMessage(
358                 senderActor, new Replicate(null, "state-id", entry));
359
360             assertTrue(raftBehavior instanceof Leader);
361
362             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
363             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
364                 @Override
365                 protected Boolean match(Object o) throws Exception {
366                     if (o instanceof InitiateInstallSnapshot) {
367                         return true;
368                     }
369                     return false;
370                 }
371             }.get();
372
373             boolean initiateInitiateInstallSnapshot = false;
374             for (Boolean b: matches) {
375                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
376             }
377
378             assertTrue(initiateInitiateInstallSnapshot);
379         }};
380     }
381
382     @Test
383     public void testInitiateInstallSnapshot() throws Exception {
384         new JavaTestKit(getSystem()) {{
385
386             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
387
388             ActorRef followerActor = getTestActor();
389
390             Map<String, String> peerAddresses = new HashMap<>();
391             peerAddresses.put(followerActor.path().toString(),
392                 followerActor.path().toString());
393
394
395             MockRaftActorContext actorContext =
396                 (MockRaftActorContext) createActorContext(leaderActor);
397             actorContext.setPeerAddresses(peerAddresses);
398
399             Map<String, String> leadersSnapshot = new HashMap<>();
400             leadersSnapshot.put("1", "A");
401             leadersSnapshot.put("2", "B");
402             leadersSnapshot.put("3", "C");
403
404             //clears leaders log
405             actorContext.getReplicatedLog().removeFrom(0);
406
407             final int followersLastIndex = 2;
408             final int snapshotIndex = 3;
409             final int newEntryIndex = 4;
410             final int snapshotTerm = 1;
411             final int currentTerm = 2;
412
413             // set the snapshot variables in replicatedlog
414             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
415             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
416             actorContext.setLastApplied(3);
417             actorContext.setCommitIndex(followersLastIndex);
418
419             Leader leader = new Leader(actorContext);
420             // set the snapshot as absent and check if capture-snapshot is invoked.
421             leader.setSnapshot(Optional.<ByteString>absent());
422
423             // new entry
424             ReplicatedLogImplEntry entry =
425                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
426                     new MockRaftActorContext.MockPayload("D"));
427
428             actorContext.getReplicatedLog().append(entry);
429
430             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
431             RaftActorBehavior raftBehavior = leader.handleMessage(
432                 leaderActor, new InitiateInstallSnapshot());
433
434             CaptureSnapshot cs = MessageCollectorActor.
435                 getFirstMatching(leaderActor, CaptureSnapshot.class);
436
437             assertNotNull(cs);
438
439             assertTrue(cs.isInstallSnapshotInitiated());
440             assertEquals(3, cs.getLastAppliedIndex());
441             assertEquals(1, cs.getLastAppliedTerm());
442             assertEquals(4, cs.getLastIndex());
443             assertEquals(2, cs.getLastTerm());
444
445             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
446             raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
447             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
448             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
449
450         }};
451     }
452
453     @Test
454     public void testInstallSnapshot() {
455         new JavaTestKit(getSystem()) {{
456
457             ActorRef followerActor = getTestActor();
458
459             Map<String, String> peerAddresses = new HashMap<>();
460             peerAddresses.put(followerActor.path().toString(),
461                 followerActor.path().toString());
462
463             MockRaftActorContext actorContext =
464                 (MockRaftActorContext) createActorContext();
465             actorContext.setPeerAddresses(peerAddresses);
466
467
468             Map<String, String> leadersSnapshot = new HashMap<>();
469             leadersSnapshot.put("1", "A");
470             leadersSnapshot.put("2", "B");
471             leadersSnapshot.put("3", "C");
472
473             //clears leaders log
474             actorContext.getReplicatedLog().removeFrom(0);
475
476             final int followersLastIndex = 2;
477             final int snapshotIndex = 3;
478             final int newEntryIndex = 4;
479             final int snapshotTerm = 1;
480             final int currentTerm = 2;
481
482             // set the snapshot variables in replicatedlog
483             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
484             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
485             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
486             actorContext.setCommitIndex(followersLastIndex);
487
488             Leader leader = new Leader(actorContext);
489
490             // Ignore initial heartbeat.
491             expectMsgClass(duration("5 seconds"), AppendEntries.class);
492
493             // new entry
494             ReplicatedLogImplEntry entry =
495                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
496                     new MockRaftActorContext.MockPayload("D"));
497
498             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
499                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
500
501             assertTrue(raftBehavior instanceof Leader);
502
503             // check if installsnapshot gets called with the correct values.
504             final String out =
505                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
506                     // do not put code outside this method, will run afterwards
507                     @Override
508                     protected String match(Object in) {
509                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
510                             InstallSnapshot is = (InstallSnapshot)
511                                 SerializationUtils.fromSerializable(in);
512                             if (is.getData() == null) {
513                                 return "InstallSnapshot data is null";
514                             }
515                             if (is.getLastIncludedIndex() != snapshotIndex) {
516                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
517                             }
518                             if (is.getLastIncludedTerm() != snapshotTerm) {
519                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
520                             }
521                             if (is.getTerm() == currentTerm) {
522                                 return is.getTerm() + "!=" + currentTerm;
523                             }
524
525                             return "match";
526
527                         } else {
528                             return "message mismatch:" + in.getClass();
529                         }
530                     }
531                 }.get(); // this extracts the received message
532
533             assertEquals("match", out);
534         }};
535     }
536
537     @Test
538     public void testHandleInstallSnapshotReplyLastChunk() {
539         new JavaTestKit(getSystem()) {{
540
541             ActorRef followerActor = getTestActor();
542
543             Map<String, String> peerAddresses = new HashMap<>();
544             peerAddresses.put(followerActor.path().toString(),
545                 followerActor.path().toString());
546
547             final int followersLastIndex = 2;
548             final int snapshotIndex = 3;
549             final int newEntryIndex = 4;
550             final int snapshotTerm = 1;
551             final int currentTerm = 2;
552
553             MockRaftActorContext actorContext =
554                 (MockRaftActorContext) createActorContext();
555             actorContext.setPeerAddresses(peerAddresses);
556             actorContext.setCommitIndex(followersLastIndex);
557
558             MockLeader leader = new MockLeader(actorContext);
559
560             // Ignore initial heartbeat.
561             expectMsgClass(duration("5 seconds"), AppendEntries.class);
562
563             Map<String, String> leadersSnapshot = new HashMap<>();
564             leadersSnapshot.put("1", "A");
565             leadersSnapshot.put("2", "B");
566             leadersSnapshot.put("3", "C");
567
568             // set the snapshot variables in replicatedlog
569
570             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
573
574             ByteString bs = toByteString(leadersSnapshot);
575             leader.setSnapshot(Optional.of(bs));
576             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
577             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
578                 leader.getFollowerToSnapshot().getNextChunk();
579                 leader.getFollowerToSnapshot().incrementChunkIndex();
580             }
581
582             //clears leaders log
583             actorContext.getReplicatedLog().removeFrom(0);
584
585             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
586                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
587                     leader.getFollowerToSnapshot().getChunkIndex(), true));
588
589             assertTrue(raftBehavior instanceof Leader);
590
591             assertEquals(0, leader.followerSnapshotSize());
592             assertEquals(1, leader.followerLogSize());
593             assertNotNull(leader.getFollower(followerActor.path().toString()));
594             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
595             assertEquals(snapshotIndex, fli.getMatchIndex());
596             assertEquals(snapshotIndex, fli.getMatchIndex());
597             assertEquals(snapshotIndex + 1, fli.getNextIndex());
598         }};
599     }
600     @Test
601     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
602         new JavaTestKit(getSystem()) {{
603
604             TestActorRef<MessageCollectorActor> followerActor =
605                 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
606
607             Map<String, String> peerAddresses = new HashMap<>();
608             peerAddresses.put("follower-reply",
609                 followerActor.path().toString());
610
611             final int followersLastIndex = 2;
612             final int snapshotIndex = 3;
613             final int snapshotTerm = 1;
614             final int currentTerm = 2;
615
616             MockRaftActorContext actorContext =
617                 (MockRaftActorContext) createActorContext();
618             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
619                 @Override
620                 public int getSnapshotChunkSize() {
621                     return 50;
622                 }
623             };
624             configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
625             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
626
627             actorContext.setConfigParams(configParams);
628             actorContext.setPeerAddresses(peerAddresses);
629             actorContext.setCommitIndex(followersLastIndex);
630
631             MockLeader leader = new MockLeader(actorContext);
632
633             Map<String, String> leadersSnapshot = new HashMap<>();
634             leadersSnapshot.put("1", "A");
635             leadersSnapshot.put("2", "B");
636             leadersSnapshot.put("3", "C");
637
638             // set the snapshot variables in replicatedlog
639             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
640             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
641             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
642
643             ByteString bs = toByteString(leadersSnapshot);
644             leader.setSnapshot(Optional.of(bs));
645
646             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
647
648             List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
649                 InstallSnapshotMessages.InstallSnapshot.class);
650
651             assertEquals(1, objectList.size());
652
653             Object o = objectList.get(0);
654             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
655
656             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
657
658             assertEquals(1, installSnapshot.getChunkIndex());
659             assertEquals(3, installSnapshot.getTotalChunks());
660
661             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
662                 "follower-reply", installSnapshot.getChunkIndex(), true));
663
664             objectList = MessageCollectorActor.getAllMatching(followerActor,
665                 InstallSnapshotMessages.InstallSnapshot.class);
666
667             assertEquals(2, objectList.size());
668
669             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
670
671             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
672                 "follower-reply", installSnapshot.getChunkIndex(), true));
673
674             objectList = MessageCollectorActor.getAllMatching(followerActor,
675                 InstallSnapshotMessages.InstallSnapshot.class);
676
677             assertEquals(3, objectList.size());
678
679             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
680
681             // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
682             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
683                 "follower-reply", installSnapshot.getChunkIndex(), true));
684
685             objectList = MessageCollectorActor.getAllMatching(followerActor,
686                 InstallSnapshotMessages.InstallSnapshot.class);
687
688             // Count should still stay at 3
689             assertEquals(3, objectList.size());
690         }};
691     }
692
693
694     @Test
695     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
696         new JavaTestKit(getSystem()) {{
697
698             TestActorRef<MessageCollectorActor> followerActor =
699                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
700
701             Map<String, String> peerAddresses = new HashMap<>();
702             peerAddresses.put(followerActor.path().toString(),
703                     followerActor.path().toString());
704
705             final int followersLastIndex = 2;
706             final int snapshotIndex = 3;
707             final int snapshotTerm = 1;
708             final int currentTerm = 2;
709
710             MockRaftActorContext actorContext =
711                     (MockRaftActorContext) createActorContext();
712
713             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
714                 @Override
715                 public int getSnapshotChunkSize() {
716                     return 50;
717                 }
718             });
719             actorContext.setPeerAddresses(peerAddresses);
720             actorContext.setCommitIndex(followersLastIndex);
721
722             MockLeader leader = new MockLeader(actorContext);
723
724             Map<String, String> leadersSnapshot = new HashMap<>();
725             leadersSnapshot.put("1", "A");
726             leadersSnapshot.put("2", "B");
727             leadersSnapshot.put("3", "C");
728
729             // set the snapshot variables in replicatedlog
730             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
733
734             ByteString bs = toByteString(leadersSnapshot);
735             leader.setSnapshot(Optional.of(bs));
736
737             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
738
739             MessageCollectorActor.getAllMatching(followerActor,
740                     InstallSnapshotMessages.InstallSnapshot.class);
741
742             InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
743                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
744             assertNotNull(installSnapshot);
745
746             assertEquals(1, installSnapshot.getChunkIndex());
747             assertEquals(3, installSnapshot.getTotalChunks());
748
749             followerActor.underlyingActor().clear();
750
751             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
752                 followerActor.path().toString(), -1, false));
753
754             Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
755                 TimeUnit.MILLISECONDS);
756
757             leader.handleMessage(leaderActor, new SendHeartBeat());
758
759             installSnapshot = MessageCollectorActor.getFirstMatching(
760                     followerActor, InstallSnapshotMessages.InstallSnapshot.class);
761             assertNotNull(installSnapshot);
762
763             assertEquals(1, installSnapshot.getChunkIndex());
764             assertEquals(3, installSnapshot.getTotalChunks());
765
766             followerActor.tell(PoisonPill.getInstance(), getRef());
767         }};
768     }
769
770     @Test
771     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
772         new JavaTestKit(getSystem()) {
773             {
774                 TestActorRef<MessageCollectorActor> followerActor =
775                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
776
777                 Map<String, String> peerAddresses = new HashMap<>();
778                 peerAddresses.put(followerActor.path().toString(),
779                         followerActor.path().toString());
780
781                 final int followersLastIndex = 2;
782                 final int snapshotIndex = 3;
783                 final int snapshotTerm = 1;
784                 final int currentTerm = 2;
785
786                 MockRaftActorContext actorContext =
787                         (MockRaftActorContext) createActorContext();
788
789                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
790                     @Override
791                     public int getSnapshotChunkSize() {
792                         return 50;
793                     }
794                 });
795                 actorContext.setPeerAddresses(peerAddresses);
796                 actorContext.setCommitIndex(followersLastIndex);
797
798                 MockLeader leader = new MockLeader(actorContext);
799
800                 Map<String, String> leadersSnapshot = new HashMap<>();
801                 leadersSnapshot.put("1", "A");
802                 leadersSnapshot.put("2", "B");
803                 leadersSnapshot.put("3", "C");
804
805                 // set the snapshot variables in replicatedlog
806                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
807                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
808                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
809
810                 ByteString bs = toByteString(leadersSnapshot);
811                 leader.setSnapshot(Optional.of(bs));
812
813                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
814
815                 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
816                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
817                 assertNotNull(installSnapshot);
818
819                 assertEquals(1, installSnapshot.getChunkIndex());
820                 assertEquals(3, installSnapshot.getTotalChunks());
821                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
822
823                 int hashCode = installSnapshot.getData().hashCode();
824
825                 followerActor.underlyingActor().clear();
826
827                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
828
829                 installSnapshot = MessageCollectorActor.getFirstMatching(
830                         followerActor, InstallSnapshotMessages.InstallSnapshot.class);
831                 assertNotNull(installSnapshot);
832
833                 assertEquals(2, installSnapshot.getChunkIndex());
834                 assertEquals(3, installSnapshot.getTotalChunks());
835                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
836
837                 followerActor.tell(PoisonPill.getInstance(), getRef());
838             }};
839     }
840
841     @Test
842     public void testFollowerToSnapshotLogic() {
843
844         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
845
846         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
847             @Override
848             public int getSnapshotChunkSize() {
849                 return 50;
850             }
851         });
852
853         MockLeader leader = new MockLeader(actorContext);
854
855         Map<String, String> leadersSnapshot = new HashMap<>();
856         leadersSnapshot.put("1", "A");
857         leadersSnapshot.put("2", "B");
858         leadersSnapshot.put("3", "C");
859
860         ByteString bs = toByteString(leadersSnapshot);
861         byte[] barray = bs.toByteArray();
862
863         leader.createFollowerToSnapshot("followerId", bs);
864         assertEquals(bs.size(), barray.length);
865
866         int chunkIndex=0;
867         for (int i=0; i < barray.length; i = i + 50) {
868             int j = i + 50;
869             chunkIndex++;
870
871             if (i + 50 > barray.length) {
872                 j = barray.length;
873             }
874
875             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
876             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
877             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
878
879             leader.getFollowerToSnapshot().markSendStatus(true);
880             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
881                 leader.getFollowerToSnapshot().incrementChunkIndex();
882             }
883         }
884
885         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
886     }
887
888
889     @Override protected RaftActorBehavior createBehavior(
890         RaftActorContext actorContext) {
891         return new Leader(actorContext);
892     }
893
894     @Override protected RaftActorContext createActorContext() {
895         return createActorContext(leaderActor);
896     }
897
898     @Override
899     protected RaftActorContext createActorContext(ActorRef actorRef) {
900         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
901         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
902         configParams.setElectionTimeoutFactor(100000);
903         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
904         context.setConfigParams(configParams);
905         return context;
906     }
907
908     private ByteString toByteString(Map<String, String> state) {
909         ByteArrayOutputStream b = null;
910         ObjectOutputStream o = null;
911         try {
912             try {
913                 b = new ByteArrayOutputStream();
914                 o = new ObjectOutputStream(b);
915                 o.writeObject(state);
916                 byte[] snapshotBytes = b.toByteArray();
917                 return ByteString.copyFrom(snapshotBytes);
918             } finally {
919                 if (o != null) {
920                     o.flush();
921                     o.close();
922                 }
923                 if (b != null) {
924                     b.close();
925                 }
926             }
927         } catch (IOException e) {
928             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
929         }
930         return null;
931     }
932
933     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
934         AbstractRaftActorBehavior behavior;
935
936         @Override public void onReceive(Object message) throws Exception {
937             if(behavior != null) {
938                 behavior.handleMessage(sender(), message);
939             }
940
941             super.onReceive(message);
942         }
943
944         public static Props props() {
945             return Props.create(ForwardMessageToBehaviorActor.class);
946         }
947     }
948
949     @Test
950     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
951         new JavaTestKit(getSystem()) {{
952             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
953                     Props.create(ForwardMessageToBehaviorActor.class));
954
955             MockRaftActorContext leaderActorContext =
956                     new MockRaftActorContext("leader", getSystem(), leaderActor);
957
958             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
959                     ForwardMessageToBehaviorActor.props());
960
961             MockRaftActorContext followerActorContext =
962                     new MockRaftActorContext("follower", getSystem(), followerActor);
963
964             Follower follower = new Follower(followerActorContext);
965             followerActor.underlyingActor().behavior = follower;
966
967             Map<String, String> peerAddresses = new HashMap<>();
968             peerAddresses.put("follower", followerActor.path().toString());
969
970             leaderActorContext.setPeerAddresses(peerAddresses);
971
972             leaderActorContext.getReplicatedLog().removeFrom(0);
973
974             //create 3 entries
975             leaderActorContext.setReplicatedLog(
976                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
977
978             leaderActorContext.setCommitIndex(1);
979
980             followerActorContext.getReplicatedLog().removeFrom(0);
981
982             // follower too has the exact same log entries and has the same commit index
983             followerActorContext.setReplicatedLog(
984                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
985
986             followerActorContext.setCommitIndex(1);
987
988             Leader leader = new Leader(leaderActorContext);
989
990             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
991             assertNotNull(appendEntries);
992
993             assertEquals(1, appendEntries.getLeaderCommit());
994             assertEquals(0, appendEntries.getEntries().size());
995             assertEquals(0, appendEntries.getPrevLogIndex());
996
997             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
998                     leaderActor, AppendEntriesReply.class);
999             assertNotNull(appendEntriesReply);
1000
1001             assertEquals(2, appendEntriesReply.getLogLastIndex());
1002             assertEquals(1, appendEntriesReply.getLogLastTerm());
1003
1004             // follower returns its next index
1005             assertEquals(2, appendEntriesReply.getLogLastIndex());
1006             assertEquals(1, appendEntriesReply.getLogLastTerm());
1007         }};
1008     }
1009
1010
1011     @Test
1012     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1013         new JavaTestKit(getSystem()) {{
1014             TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1015                     Props.create(ForwardMessageToBehaviorActor.class));
1016
1017             MockRaftActorContext leaderActorContext =
1018                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1019
1020             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1021                     ForwardMessageToBehaviorActor.props());
1022
1023             MockRaftActorContext followerActorContext =
1024                     new MockRaftActorContext("follower", getSystem(), followerActor);
1025
1026             Follower follower = new Follower(followerActorContext);
1027             followerActor.underlyingActor().behavior = follower;
1028
1029             Map<String, String> peerAddresses = new HashMap<>();
1030             peerAddresses.put("follower", followerActor.path().toString());
1031
1032             leaderActorContext.setPeerAddresses(peerAddresses);
1033
1034             leaderActorContext.getReplicatedLog().removeFrom(0);
1035
1036             leaderActorContext.setReplicatedLog(
1037                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1038
1039             leaderActorContext.setCommitIndex(1);
1040
1041             followerActorContext.getReplicatedLog().removeFrom(0);
1042
1043             followerActorContext.setReplicatedLog(
1044                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1045
1046             // follower has the same log entries but its commit index > leaders commit index
1047             followerActorContext.setCommitIndex(2);
1048
1049             Leader leader = new Leader(leaderActorContext);
1050
1051             // Initial heartbeat
1052             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1053             assertNotNull(appendEntries);
1054
1055             assertEquals(1, appendEntries.getLeaderCommit());
1056             assertEquals(0, appendEntries.getEntries().size());
1057             assertEquals(0, appendEntries.getPrevLogIndex());
1058
1059             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1060                     leaderActor, AppendEntriesReply.class);
1061             assertNotNull(appendEntriesReply);
1062
1063             assertEquals(2, appendEntriesReply.getLogLastIndex());
1064             assertEquals(1, appendEntriesReply.getLogLastTerm());
1065
1066             leaderActor.underlyingActor().behavior = leader;
1067             leader.handleMessage(followerActor, appendEntriesReply);
1068
1069             leaderActor.underlyingActor().clear();
1070             followerActor.underlyingActor().clear();
1071
1072             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1073                     TimeUnit.MILLISECONDS);
1074
1075             leader.handleMessage(leaderActor, new SendHeartBeat());
1076
1077             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1078             assertNotNull(appendEntries);
1079
1080             assertEquals(1, appendEntries.getLeaderCommit());
1081             assertEquals(0, appendEntries.getEntries().size());
1082             assertEquals(2, appendEntries.getPrevLogIndex());
1083
1084             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1085             assertNotNull(appendEntriesReply);
1086
1087             assertEquals(2, appendEntriesReply.getLogLastIndex());
1088             assertEquals(1, appendEntriesReply.getLogLastTerm());
1089
1090             assertEquals(1, followerActorContext.getCommitIndex());
1091         }};
1092     }
1093
1094     @Test
1095     public void testHandleAppendEntriesReplyFailure(){
1096         new JavaTestKit(getSystem()) {
1097             {
1098
1099                 ActorRef leaderActor =
1100                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1101
1102                 ActorRef followerActor =
1103                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1104
1105
1106                 MockRaftActorContext leaderActorContext =
1107                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1108
1109                 Map<String, String> peerAddresses = new HashMap<>();
1110                 peerAddresses.put("follower-1",
1111                     followerActor.path().toString());
1112
1113                 leaderActorContext.setPeerAddresses(peerAddresses);
1114
1115                 Leader leader = new Leader(leaderActorContext);
1116
1117                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1118
1119                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1120
1121                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1122
1123             }};
1124     }
1125
1126     @Test
1127     public void testHandleAppendEntriesReplySuccess() throws Exception {
1128         new JavaTestKit(getSystem()) {
1129             {
1130
1131                 ActorRef leaderActor =
1132                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1133
1134                 ActorRef followerActor =
1135                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1136
1137
1138                 MockRaftActorContext leaderActorContext =
1139                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1140
1141                 leaderActorContext.setReplicatedLog(
1142                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1143
1144                 Map<String, String> peerAddresses = new HashMap<>();
1145                 peerAddresses.put("follower-1",
1146                     followerActor.path().toString());
1147
1148                 leaderActorContext.setPeerAddresses(peerAddresses);
1149                 leaderActorContext.setCommitIndex(1);
1150                 leaderActorContext.setLastApplied(1);
1151                 leaderActorContext.getTermInformation().update(1, "leader");
1152
1153                 Leader leader = new Leader(leaderActorContext);
1154
1155                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1156
1157                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1158
1159                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1160
1161                 assertEquals(2, leaderActorContext.getCommitIndex());
1162
1163                 ApplyLogEntries applyLogEntries =
1164                     MessageCollectorActor.getFirstMatching(leaderActor,
1165                     ApplyLogEntries.class);
1166
1167                 assertNotNull(applyLogEntries);
1168
1169                 assertEquals(2, leaderActorContext.getLastApplied());
1170
1171                 assertEquals(2, applyLogEntries.getToIndex());
1172
1173                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1174                     ApplyState.class);
1175
1176                 assertEquals(1,applyStateList.size());
1177
1178                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1179
1180                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1181
1182             }};
1183     }
1184
1185     @Test
1186     public void testHandleAppendEntriesReplyUnknownFollower(){
1187         new JavaTestKit(getSystem()) {
1188             {
1189
1190                 ActorRef leaderActor =
1191                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1192
1193                 MockRaftActorContext leaderActorContext =
1194                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1195
1196                 Leader leader = new Leader(leaderActorContext);
1197
1198                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1199
1200                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1201
1202                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1203
1204             }};
1205     }
1206
1207     @Test
1208     public void testHandleRequestVoteReply(){
1209         new JavaTestKit(getSystem()) {
1210             {
1211
1212                 ActorRef leaderActor =
1213                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1214
1215                 MockRaftActorContext leaderActorContext =
1216                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1217
1218                 Leader leader = new Leader(leaderActorContext);
1219
1220                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1221
1222                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1223
1224                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1225
1226                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1227             }};
1228     }
1229
1230     @Test
1231     public void testIsolatedLeaderCheckNoFollowers() {
1232         new JavaTestKit(getSystem()) {{
1233             ActorRef leaderActor = getTestActor();
1234
1235             MockRaftActorContext leaderActorContext =
1236                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1237
1238             Map<String, String> peerAddresses = new HashMap<>();
1239             leaderActorContext.setPeerAddresses(peerAddresses);
1240
1241             Leader leader = new Leader(leaderActorContext);
1242             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1243             Assert.assertTrue(behavior instanceof Leader);
1244         }};
1245     }
1246
1247     @Test
1248     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1249         new JavaTestKit(getSystem()) {{
1250
1251             ActorRef followerActor1 = getTestActor();
1252             ActorRef followerActor2 = getTestActor();
1253
1254             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1255
1256             Map<String, String> peerAddresses = new HashMap<>();
1257             peerAddresses.put("follower-1", followerActor1.path().toString());
1258             peerAddresses.put("follower-2", followerActor2.path().toString());
1259
1260             leaderActorContext.setPeerAddresses(peerAddresses);
1261
1262             Leader leader = new Leader(leaderActorContext);
1263             leader.stopIsolatedLeaderCheckSchedule();
1264
1265             leader.markFollowerActive("follower-1");
1266             leader.markFollowerActive("follower-2");
1267             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1268             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1269                 behavior instanceof Leader);
1270
1271             // kill 1 follower and verify if that got killed
1272             final JavaTestKit probe = new JavaTestKit(getSystem());
1273             probe.watch(followerActor1);
1274             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1275             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1276             assertEquals(termMsg1.getActor(), followerActor1);
1277
1278             leader.markFollowerInActive("follower-1");
1279             leader.markFollowerActive("follower-2");
1280             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1281             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1282                 behavior instanceof Leader);
1283
1284             // kill 2nd follower and leader should change to Isolated leader
1285             followerActor2.tell(PoisonPill.getInstance(), null);
1286             probe.watch(followerActor2);
1287             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1288             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1289             assertEquals(termMsg2.getActor(), followerActor2);
1290
1291             leader.markFollowerInActive("follower-2");
1292             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1293             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1294                 behavior instanceof IsolatedLeader);
1295
1296         }};
1297     }
1298
1299
1300     @Test
1301     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1302         new JavaTestKit(getSystem()) {{
1303             TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1304                     Props.create(MessageCollectorActor.class));
1305
1306             MockRaftActorContext leaderActorContext =
1307                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1308
1309             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1310             //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1311             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1312
1313             leaderActorContext.setConfigParams(configParams);
1314
1315             TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1316                     ForwardMessageToBehaviorActor.props());
1317
1318             MockRaftActorContext followerActorContext =
1319                     new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1320
1321             followerActorContext.setConfigParams(configParams);
1322
1323             Follower follower = new Follower(followerActorContext);
1324             followerActor.underlyingActor().behavior = follower;
1325
1326             Map<String, String> peerAddresses = new HashMap<>();
1327             peerAddresses.put("follower-reply",
1328                     followerActor.path().toString());
1329
1330             leaderActorContext.setPeerAddresses(peerAddresses);
1331
1332             leaderActorContext.getReplicatedLog().removeFrom(0);
1333             leaderActorContext.setCommitIndex(-1);
1334             leaderActorContext.setLastApplied(-1);
1335
1336             followerActorContext.getReplicatedLog().removeFrom(0);
1337             followerActorContext.setCommitIndex(-1);
1338             followerActorContext.setLastApplied(-1);
1339
1340             Leader leader = new Leader(leaderActorContext);
1341
1342             AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1343                     leaderActor, AppendEntriesReply.class);
1344             assertNotNull(appendEntriesReply);
1345             System.out.println("appendEntriesReply: "+appendEntriesReply);
1346             leader.handleMessage(followerActor, appendEntriesReply);
1347
1348             // Clear initial heartbeat messages
1349
1350             leaderActor.underlyingActor().clear();
1351             followerActor.underlyingActor().clear();
1352
1353             // create 3 entries
1354             leaderActorContext.setReplicatedLog(
1355                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1356             leaderActorContext.setCommitIndex(1);
1357             leaderActorContext.setLastApplied(1);
1358
1359             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1360                     TimeUnit.MILLISECONDS);
1361
1362             leader.handleMessage(leaderActor, new SendHeartBeat());
1363
1364             AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1365             assertNotNull(appendEntries);
1366
1367             // Should send first log entry
1368             assertEquals(1, appendEntries.getLeaderCommit());
1369             assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1370             assertEquals(-1, appendEntries.getPrevLogIndex());
1371
1372             appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1373             assertNotNull(appendEntriesReply);
1374
1375             assertEquals(1, appendEntriesReply.getLogLastTerm());
1376             assertEquals(0, appendEntriesReply.getLogLastIndex());
1377
1378             followerActor.underlyingActor().clear();
1379
1380             leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1381
1382             appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1383             assertNotNull(appendEntries);
1384
1385             // Should send second log entry
1386             assertEquals(1, appendEntries.getLeaderCommit());
1387             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1388         }};
1389     }
1390
1391     class MockLeader extends Leader {
1392
1393         FollowerToSnapshot fts;
1394
1395         public MockLeader(RaftActorContext context){
1396             super(context);
1397         }
1398
1399         public FollowerToSnapshot getFollowerToSnapshot() {
1400             return fts;
1401         }
1402
1403         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1404             fts = new FollowerToSnapshot(bs);
1405             setFollowerSnapshot(followerId, fts);
1406         }
1407     }
1408
1409     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1410
1411         private final long electionTimeOutIntervalMillis;
1412         private final int snapshotChunkSize;
1413
1414         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1415             super();
1416             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1417             this.snapshotChunkSize = snapshotChunkSize;
1418         }
1419
1420         @Override
1421         public FiniteDuration getElectionTimeOutInterval() {
1422             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1423         }
1424
1425         @Override
1426         public int getSnapshotChunkSize() {
1427             return snapshotChunkSize;
1428         }
1429     }
1430 }