Bug-2692:Avoid fake snapshot during initiate snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import com.google.protobuf.ByteString;
12 import java.io.ByteArrayOutputStream;
13 import java.io.IOException;
14 import java.io.ObjectOutputStream;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Assert;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.RaftState;
26 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
27 import org.opendaylight.controller.cluster.raft.SerializationUtils;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
33 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
37 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
41 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotNull;
48 import static org.junit.Assert.assertTrue;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     private final ActorRef leaderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54     private final ActorRef senderActor =
55         getSystem().actorOf(Props.create(DoNothingActor.class));
56
57     @Test
58     public void testHandleMessageForUnknownMessage() throws Exception {
59         new JavaTestKit(getSystem()) {{
60             Leader leader =
61                 new Leader(createActorContext());
62
63             // handle message should return the Leader state when it receives an
64             // unknown message
65             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66             Assert.assertTrue(behavior instanceof Leader);
67         }};
68     }
69
70     @Test
71     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72         new JavaTestKit(getSystem()) {{
73
74             new Within(duration("1 seconds")) {
75                 @Override
76                 protected void run() {
77
78                     ActorRef followerActor = getTestActor();
79
80                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81
82                     Map<String, String> peerAddresses = new HashMap<>();
83
84                     peerAddresses.put(followerActor.path().toString(),
85                         followerActor.path().toString());
86
87                     actorContext.setPeerAddresses(peerAddresses);
88
89                     Leader leader = new Leader(actorContext);
90                     leader.handleMessage(senderActor, new SendHeartBeat());
91
92                     final String out =
93                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94                             // do not put code outside this method, will run afterwards
95                             @Override
96                             protected String match(Object in) {
97                                 Object msg = fromSerializableMessage(in);
98                                 if (msg instanceof AppendEntries) {
99                                     if (((AppendEntries)msg).getTerm() == 0) {
100                                         return "match";
101                                     }
102                                     return null;
103                                 } else {
104                                     throw noMatch();
105                                 }
106                             }
107                         }.get(); // this extracts the received message
108
109                     assertEquals("match", out);
110
111                 }
112             };
113         }};
114     }
115
116     @Test
117     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
118         new JavaTestKit(getSystem()) {{
119
120             new Within(duration("1 seconds")) {
121                 @Override
122                 protected void run() {
123
124                     ActorRef followerActor = getTestActor();
125
126                     MockRaftActorContext actorContext =
127                         (MockRaftActorContext) createActorContext();
128
129                     Map<String, String> peerAddresses = new HashMap<>();
130
131                     peerAddresses.put(followerActor.path().toString(),
132                             followerActor.path().toString());
133
134                     actorContext.setPeerAddresses(peerAddresses);
135
136                     Leader leader = new Leader(actorContext);
137                     RaftActorBehavior raftBehavior = leader
138                         .handleMessage(senderActor, new Replicate(null, null,
139                             new MockRaftActorContext.MockReplicatedLogEntry(1,
140                                 100,
141                                 new MockRaftActorContext.MockPayload("foo"))
142                         ));
143
144                     // State should not change
145                     assertTrue(raftBehavior instanceof Leader);
146
147                     final String out =
148                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
149                             // do not put code outside this method, will run afterwards
150                             @Override
151                             protected String match(Object in) {
152                                 Object msg = fromSerializableMessage(in);
153                                 if (msg instanceof AppendEntries) {
154                                     if (((AppendEntries)msg).getTerm() == 0) {
155                                         return "match";
156                                     }
157                                     return null;
158                                 } else {
159                                     throw noMatch();
160                                 }
161                             }
162                         }.get(); // this extracts the received message
163
164                     assertEquals("match", out);
165                 }
166             };
167         }};
168     }
169
170     @Test
171     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
172         new JavaTestKit(getSystem()) {{
173
174             new Within(duration("1 seconds")) {
175                 @Override
176                 protected void run() {
177
178                     ActorRef raftActor = getTestActor();
179
180                     MockRaftActorContext actorContext =
181                         new MockRaftActorContext("test", getSystem(), raftActor);
182
183                     actorContext.getReplicatedLog().removeFrom(0);
184
185                     actorContext.setReplicatedLog(
186                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
187                             .build());
188
189                     Leader leader = new Leader(actorContext);
190                     RaftActorBehavior raftBehavior = leader
191                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
192
193                     // State should not change
194                     assertTrue(raftBehavior instanceof Leader);
195
196                     assertEquals(1, actorContext.getCommitIndex());
197
198                     final String out =
199                         new ExpectMsg<String>(duration("1 seconds"),
200                             "match hint") {
201                             // do not put code outside this method, will run afterwards
202                             @Override
203                             protected String match(Object in) {
204                                 if (in instanceof ApplyState) {
205                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
206                                         return "match";
207                                     }
208                                     return null;
209                                 } else {
210                                     throw noMatch();
211                                 }
212                             }
213                         }.get(); // this extracts the received message
214
215                     assertEquals("match", out);
216
217                 }
218             };
219         }};
220     }
221
222     @Test
223     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
224         new JavaTestKit(getSystem()) {{
225             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
226
227             Map<String, String> peerAddresses = new HashMap<>();
228             peerAddresses.put(followerActor.path().toString(),
229                 followerActor.path().toString());
230
231             MockRaftActorContext actorContext =
232                 (MockRaftActorContext) createActorContext(leaderActor);
233             actorContext.setPeerAddresses(peerAddresses);
234
235             Map<String, String> leadersSnapshot = new HashMap<>();
236             leadersSnapshot.put("1", "A");
237             leadersSnapshot.put("2", "B");
238             leadersSnapshot.put("3", "C");
239
240             //clears leaders log
241             actorContext.getReplicatedLog().removeFrom(0);
242
243             final int followersLastIndex = 2;
244             final int snapshotIndex = 3;
245             final int newEntryIndex = 4;
246             final int snapshotTerm = 1;
247             final int currentTerm = 2;
248
249             // set the snapshot variables in replicatedlog
250             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
252             actorContext.setCommitIndex(followersLastIndex);
253             //set follower timeout to 2 mins, helps during debugging
254             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
255
256             MockLeader leader = new MockLeader(actorContext);
257
258             // new entry
259             ReplicatedLogImplEntry entry =
260                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261                     new MockRaftActorContext.MockPayload("D"));
262
263             //update follower timestamp
264             leader.markFollowerActive(followerActor.path().toString());
265
266             ByteString bs = toByteString(leadersSnapshot);
267             leader.setSnapshot(Optional.of(bs));
268             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
269
270             //send first chunk and no InstallSnapshotReply received yet
271             leader.getFollowerToSnapshot().getNextChunk();
272             leader.getFollowerToSnapshot().incrementChunkIndex();
273
274             leader.handleMessage(leaderActor, new SendHeartBeat());
275
276             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
277                 followerActor, AppendEntries.class);
278
279             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
280                 "received", aeproto);
281
282             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
283
284             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
285
286             //InstallSnapshotReply received
287             leader.getFollowerToSnapshot().markSendStatus(true);
288
289             leader.handleMessage(senderActor, new SendHeartBeat());
290
291             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
292                 MessageCollectorActor.getFirstMatching(followerActor,
293                     InstallSnapshot.SERIALIZABLE_CLASS);
294
295             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
296                 isproto);
297
298             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
299
300             assertEquals(snapshotIndex, is.getLastIncludedIndex());
301
302         }};
303     }
304
305     @Test
306     public void testSendAppendEntriesSnapshotScenario() {
307         new JavaTestKit(getSystem()) {{
308
309             ActorRef followerActor = getTestActor();
310
311             Map<String, String> peerAddresses = new HashMap<>();
312             peerAddresses.put(followerActor.path().toString(),
313                 followerActor.path().toString());
314
315             MockRaftActorContext actorContext =
316                 (MockRaftActorContext) createActorContext(getRef());
317             actorContext.setPeerAddresses(peerAddresses);
318
319             Map<String, String> leadersSnapshot = new HashMap<>();
320             leadersSnapshot.put("1", "A");
321             leadersSnapshot.put("2", "B");
322             leadersSnapshot.put("3", "C");
323
324             //clears leaders log
325             actorContext.getReplicatedLog().removeFrom(0);
326
327             final int followersLastIndex = 2;
328             final int snapshotIndex = 3;
329             final int newEntryIndex = 4;
330             final int snapshotTerm = 1;
331             final int currentTerm = 2;
332
333             // set the snapshot variables in replicatedlog
334             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
335             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
336             actorContext.setCommitIndex(followersLastIndex);
337
338             Leader leader = new Leader(actorContext);
339
340             // new entry
341             ReplicatedLogImplEntry entry =
342                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
343                     new MockRaftActorContext.MockPayload("D"));
344
345             //update follower timestamp
346             leader.markFollowerActive(followerActor.path().toString());
347
348             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
349             RaftActorBehavior raftBehavior = leader.handleMessage(
350                 senderActor, new Replicate(null, "state-id", entry));
351
352             assertTrue(raftBehavior instanceof Leader);
353
354             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
355             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
356                 @Override
357                 protected Boolean match(Object o) throws Exception {
358                     if (o instanceof InitiateInstallSnapshot) {
359                         return true;
360                     }
361                     return false;
362                 }
363             }.get();
364
365             boolean initiateInitiateInstallSnapshot = false;
366             for (Boolean b: matches) {
367                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
368             }
369
370             assertTrue(initiateInitiateInstallSnapshot);
371         }};
372     }
373
374     @Test
375     public void testInitiateInstallSnapshot() throws Exception {
376         new JavaTestKit(getSystem()) {{
377
378             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
379
380             ActorRef followerActor = getTestActor();
381
382             Map<String, String> peerAddresses = new HashMap<>();
383             peerAddresses.put(followerActor.path().toString(),
384                 followerActor.path().toString());
385
386
387             MockRaftActorContext actorContext =
388                 (MockRaftActorContext) createActorContext(leaderActor);
389             actorContext.setPeerAddresses(peerAddresses);
390
391             Map<String, String> leadersSnapshot = new HashMap<>();
392             leadersSnapshot.put("1", "A");
393             leadersSnapshot.put("2", "B");
394             leadersSnapshot.put("3", "C");
395
396             //clears leaders log
397             actorContext.getReplicatedLog().removeFrom(0);
398
399             final int followersLastIndex = 2;
400             final int snapshotIndex = 3;
401             final int newEntryIndex = 4;
402             final int snapshotTerm = 1;
403             final int currentTerm = 2;
404
405             // set the snapshot variables in replicatedlog
406             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
407             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
408             actorContext.setLastApplied(3);
409             actorContext.setCommitIndex(followersLastIndex);
410
411             Leader leader = new Leader(actorContext);
412             // set the snapshot as absent and check if capture-snapshot is invoked.
413             leader.setSnapshot(Optional.<ByteString>absent());
414
415             // new entry
416             ReplicatedLogImplEntry entry =
417                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
418                     new MockRaftActorContext.MockPayload("D"));
419
420             actorContext.getReplicatedLog().append(entry);
421
422             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
423             RaftActorBehavior raftBehavior = leader.handleMessage(
424                 leaderActor, new InitiateInstallSnapshot());
425
426             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
427                 getFirstMatching(leaderActor, CaptureSnapshot.class);
428
429             assertNotNull(cs);
430
431             assertTrue(cs.isInstallSnapshotInitiated());
432             assertEquals(3, cs.getLastAppliedIndex());
433             assertEquals(1, cs.getLastAppliedTerm());
434             assertEquals(4, cs.getLastIndex());
435             assertEquals(2, cs.getLastTerm());
436
437             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
438             raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
439             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
440             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
441
442         }};
443     }
444
445     @Test
446     public void testInstallSnapshot() {
447         new JavaTestKit(getSystem()) {{
448
449             ActorRef followerActor = getTestActor();
450
451             Map<String, String> peerAddresses = new HashMap<>();
452             peerAddresses.put(followerActor.path().toString(),
453                 followerActor.path().toString());
454
455             MockRaftActorContext actorContext =
456                 (MockRaftActorContext) createActorContext();
457             actorContext.setPeerAddresses(peerAddresses);
458
459
460             Map<String, String> leadersSnapshot = new HashMap<>();
461             leadersSnapshot.put("1", "A");
462             leadersSnapshot.put("2", "B");
463             leadersSnapshot.put("3", "C");
464
465             //clears leaders log
466             actorContext.getReplicatedLog().removeFrom(0);
467
468             final int followersLastIndex = 2;
469             final int snapshotIndex = 3;
470             final int newEntryIndex = 4;
471             final int snapshotTerm = 1;
472             final int currentTerm = 2;
473
474             // set the snapshot variables in replicatedlog
475             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
476             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
477             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
478             actorContext.setCommitIndex(followersLastIndex);
479
480             Leader leader = new Leader(actorContext);
481
482             // new entry
483             ReplicatedLogImplEntry entry =
484                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
485                     new MockRaftActorContext.MockPayload("D"));
486
487             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
488                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
489
490             assertTrue(raftBehavior instanceof Leader);
491
492             // check if installsnapshot gets called with the correct values.
493             final String out =
494                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
495                     // do not put code outside this method, will run afterwards
496                     @Override
497                     protected String match(Object in) {
498                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
499                             InstallSnapshot is = (InstallSnapshot)
500                                 SerializationUtils.fromSerializable(in);
501                             if (is.getData() == null) {
502                                 return "InstallSnapshot data is null";
503                             }
504                             if (is.getLastIncludedIndex() != snapshotIndex) {
505                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
506                             }
507                             if (is.getLastIncludedTerm() != snapshotTerm) {
508                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
509                             }
510                             if (is.getTerm() == currentTerm) {
511                                 return is.getTerm() + "!=" + currentTerm;
512                             }
513
514                             return "match";
515
516                         } else {
517                             return "message mismatch:" + in.getClass();
518                         }
519                     }
520                 }.get(); // this extracts the received message
521
522             assertEquals("match", out);
523         }};
524     }
525
526     @Test
527     public void testHandleInstallSnapshotReplyLastChunk() {
528         new JavaTestKit(getSystem()) {{
529
530             ActorRef followerActor = getTestActor();
531
532             Map<String, String> peerAddresses = new HashMap<>();
533             peerAddresses.put(followerActor.path().toString(),
534                 followerActor.path().toString());
535
536             final int followersLastIndex = 2;
537             final int snapshotIndex = 3;
538             final int newEntryIndex = 4;
539             final int snapshotTerm = 1;
540             final int currentTerm = 2;
541
542             MockRaftActorContext actorContext =
543                 (MockRaftActorContext) createActorContext();
544             actorContext.setPeerAddresses(peerAddresses);
545             actorContext.setCommitIndex(followersLastIndex);
546
547             MockLeader leader = new MockLeader(actorContext);
548
549             Map<String, String> leadersSnapshot = new HashMap<>();
550             leadersSnapshot.put("1", "A");
551             leadersSnapshot.put("2", "B");
552             leadersSnapshot.put("3", "C");
553
554             // set the snapshot variables in replicatedlog
555
556             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
557             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
558             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
559
560             ByteString bs = toByteString(leadersSnapshot);
561             leader.setSnapshot(Optional.of(bs));
562             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
563             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
564                 leader.getFollowerToSnapshot().getNextChunk();
565                 leader.getFollowerToSnapshot().incrementChunkIndex();
566             }
567
568             //clears leaders log
569             actorContext.getReplicatedLog().removeFrom(0);
570
571             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
572                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
573                     leader.getFollowerToSnapshot().getChunkIndex(), true));
574
575             assertTrue(raftBehavior instanceof Leader);
576
577             assertEquals(0, leader.followerSnapshotSize());
578             assertEquals(1, leader.followerLogSize());
579             assertNotNull(leader.getFollower(followerActor.path().toString()));
580             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
581             assertEquals(snapshotIndex, fli.getMatchIndex());
582             assertEquals(snapshotIndex, fli.getMatchIndex());
583             assertEquals(snapshotIndex + 1, fli.getNextIndex());
584         }};
585     }
586
587     @Test
588     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
589         new JavaTestKit(getSystem()) {{
590
591             TestActorRef<MessageCollectorActor> followerActor =
592                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
593
594             Map<String, String> peerAddresses = new HashMap<>();
595             peerAddresses.put(followerActor.path().toString(),
596                     followerActor.path().toString());
597
598             final int followersLastIndex = 2;
599             final int snapshotIndex = 3;
600             final int snapshotTerm = 1;
601             final int currentTerm = 2;
602
603             MockRaftActorContext actorContext =
604                     (MockRaftActorContext) createActorContext();
605
606             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
607                 @Override
608                 public int getSnapshotChunkSize() {
609                     return 50;
610                 }
611             });
612             actorContext.setPeerAddresses(peerAddresses);
613             actorContext.setCommitIndex(followersLastIndex);
614
615             MockLeader leader = new MockLeader(actorContext);
616
617             Map<String, String> leadersSnapshot = new HashMap<>();
618             leadersSnapshot.put("1", "A");
619             leadersSnapshot.put("2", "B");
620             leadersSnapshot.put("3", "C");
621
622             // set the snapshot variables in replicatedlog
623             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
624             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
625             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
626
627             ByteString bs = toByteString(leadersSnapshot);
628             leader.setSnapshot(Optional.of(bs));
629
630             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
631
632             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
633
634             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
635
636             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
637
638             assertEquals(1, installSnapshot.getChunkIndex());
639             assertEquals(3, installSnapshot.getTotalChunks());
640
641
642             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
643
644             leader.handleMessage(leaderActor, new SendHeartBeat());
645
646             o = MessageCollectorActor.getAllMessages(followerActor).get(1);
647
648             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
649
650             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
651
652             assertEquals(1, installSnapshot.getChunkIndex());
653             assertEquals(3, installSnapshot.getTotalChunks());
654
655             followerActor.tell(PoisonPill.getInstance(), getRef());
656         }};
657     }
658
659     @Test
660     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
661         new JavaTestKit(getSystem()) {
662             {
663
664                 TestActorRef<MessageCollectorActor> followerActor =
665                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
666
667                 Map<String, String> peerAddresses = new HashMap<>();
668                 peerAddresses.put(followerActor.path().toString(),
669                         followerActor.path().toString());
670
671                 final int followersLastIndex = 2;
672                 final int snapshotIndex = 3;
673                 final int snapshotTerm = 1;
674                 final int currentTerm = 2;
675
676                 MockRaftActorContext actorContext =
677                         (MockRaftActorContext) createActorContext();
678
679                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
680                     @Override
681                     public int getSnapshotChunkSize() {
682                         return 50;
683                     }
684                 });
685                 actorContext.setPeerAddresses(peerAddresses);
686                 actorContext.setCommitIndex(followersLastIndex);
687
688                 MockLeader leader = new MockLeader(actorContext);
689
690                 Map<String, String> leadersSnapshot = new HashMap<>();
691                 leadersSnapshot.put("1", "A");
692                 leadersSnapshot.put("2", "B");
693                 leadersSnapshot.put("3", "C");
694
695                 // set the snapshot variables in replicatedlog
696                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
697                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
698                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
699
700                 ByteString bs = toByteString(leadersSnapshot);
701                 leader.setSnapshot(Optional.of(bs));
702
703                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
704
705                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
706
707                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
708
709                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
710
711                 assertEquals(1, installSnapshot.getChunkIndex());
712                 assertEquals(3, installSnapshot.getTotalChunks());
713                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
714
715                 int hashCode = installSnapshot.getData().hashCode();
716
717                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
718
719                 leader.handleMessage(leaderActor, new SendHeartBeat());
720
721                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
722
723                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
724
725                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
726
727                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
728
729                 assertEquals(2, installSnapshot.getChunkIndex());
730                 assertEquals(3, installSnapshot.getTotalChunks());
731                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
732
733                 followerActor.tell(PoisonPill.getInstance(), getRef());
734             }};
735     }
736
737     @Test
738     public void testFollowerToSnapshotLogic() {
739
740         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
741
742         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
743             @Override
744             public int getSnapshotChunkSize() {
745                 return 50;
746             }
747         });
748
749         MockLeader leader = new MockLeader(actorContext);
750
751         Map<String, String> leadersSnapshot = new HashMap<>();
752         leadersSnapshot.put("1", "A");
753         leadersSnapshot.put("2", "B");
754         leadersSnapshot.put("3", "C");
755
756         ByteString bs = toByteString(leadersSnapshot);
757         byte[] barray = bs.toByteArray();
758
759         leader.createFollowerToSnapshot("followerId", bs);
760         assertEquals(bs.size(), barray.length);
761
762         int chunkIndex=0;
763         for (int i=0; i < barray.length; i = i + 50) {
764             int j = i + 50;
765             chunkIndex++;
766
767             if (i + 50 > barray.length) {
768                 j = barray.length;
769             }
770
771             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
772             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
773             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
774
775             leader.getFollowerToSnapshot().markSendStatus(true);
776             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
777                 leader.getFollowerToSnapshot().incrementChunkIndex();
778             }
779         }
780
781         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
782     }
783
784
785     @Override protected RaftActorBehavior createBehavior(
786         RaftActorContext actorContext) {
787         return new Leader(actorContext);
788     }
789
790     @Override protected RaftActorContext createActorContext() {
791         return createActorContext(leaderActor);
792     }
793
794     @Override
795     protected RaftActorContext createActorContext(ActorRef actorRef) {
796         return new MockRaftActorContext("test", getSystem(), actorRef);
797     }
798
799     private ByteString toByteString(Map<String, String> state) {
800         ByteArrayOutputStream b = null;
801         ObjectOutputStream o = null;
802         try {
803             try {
804                 b = new ByteArrayOutputStream();
805                 o = new ObjectOutputStream(b);
806                 o.writeObject(state);
807                 byte[] snapshotBytes = b.toByteArray();
808                 return ByteString.copyFrom(snapshotBytes);
809             } finally {
810                 if (o != null) {
811                     o.flush();
812                     o.close();
813                 }
814                 if (b != null) {
815                     b.close();
816                 }
817             }
818         } catch (IOException e) {
819             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
820         }
821         return null;
822     }
823
824     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
825         private static AbstractRaftActorBehavior behavior;
826
827         public ForwardMessageToBehaviorActor(){
828
829         }
830
831         @Override public void onReceive(Object message) throws Exception {
832             super.onReceive(message);
833             behavior.handleMessage(sender(), message);
834         }
835
836         public static void setBehavior(AbstractRaftActorBehavior behavior){
837             ForwardMessageToBehaviorActor.behavior = behavior;
838         }
839     }
840
841     @Test
842     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
843         new JavaTestKit(getSystem()) {{
844
845             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
846
847             MockRaftActorContext leaderActorContext =
848                 new MockRaftActorContext("leader", getSystem(), leaderActor);
849
850             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
851
852             MockRaftActorContext followerActorContext =
853                 new MockRaftActorContext("follower", getSystem(), followerActor);
854
855             Follower follower = new Follower(followerActorContext);
856
857             ForwardMessageToBehaviorActor.setBehavior(follower);
858
859             Map<String, String> peerAddresses = new HashMap<>();
860             peerAddresses.put(followerActor.path().toString(),
861                 followerActor.path().toString());
862
863             leaderActorContext.setPeerAddresses(peerAddresses);
864
865             leaderActorContext.getReplicatedLog().removeFrom(0);
866
867             //create 3 entries
868             leaderActorContext.setReplicatedLog(
869                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
870
871             leaderActorContext.setCommitIndex(1);
872
873             followerActorContext.getReplicatedLog().removeFrom(0);
874
875             // follower too has the exact same log entries and has the same commit index
876             followerActorContext.setReplicatedLog(
877                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
878
879             followerActorContext.setCommitIndex(1);
880
881             Leader leader = new Leader(leaderActorContext);
882             leader.markFollowerActive(followerActor.path().toString());
883
884             leader.handleMessage(leaderActor, new SendHeartBeat());
885
886             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
887                     .getFirstMatching(followerActor, AppendEntries.class);
888
889             assertNotNull(appendEntries);
890
891             assertEquals(1, appendEntries.getLeaderCommit());
892             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
893             assertEquals(0, appendEntries.getPrevLogIndex());
894
895             AppendEntriesReply appendEntriesReply =
896                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
897                     leaderActor, AppendEntriesReply.class);
898
899             assertNotNull(appendEntriesReply);
900
901             // follower returns its next index
902             assertEquals(2, appendEntriesReply.getLogLastIndex());
903             assertEquals(1, appendEntriesReply.getLogLastTerm());
904
905         }};
906     }
907
908
909     @Test
910     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
911         new JavaTestKit(getSystem()) {{
912
913             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
914
915             MockRaftActorContext leaderActorContext =
916                 new MockRaftActorContext("leader", getSystem(), leaderActor);
917
918             ActorRef followerActor = getSystem().actorOf(
919                 Props.create(ForwardMessageToBehaviorActor.class));
920
921             MockRaftActorContext followerActorContext =
922                 new MockRaftActorContext("follower", getSystem(), followerActor);
923
924             Follower follower = new Follower(followerActorContext);
925
926             ForwardMessageToBehaviorActor.setBehavior(follower);
927
928             Map<String, String> peerAddresses = new HashMap<>();
929             peerAddresses.put(followerActor.path().toString(),
930                 followerActor.path().toString());
931
932             leaderActorContext.setPeerAddresses(peerAddresses);
933
934             leaderActorContext.getReplicatedLog().removeFrom(0);
935
936             leaderActorContext.setReplicatedLog(
937                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
938
939             leaderActorContext.setCommitIndex(1);
940
941             followerActorContext.getReplicatedLog().removeFrom(0);
942
943             followerActorContext.setReplicatedLog(
944                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
945
946             // follower has the same log entries but its commit index > leaders commit index
947             followerActorContext.setCommitIndex(2);
948
949             Leader leader = new Leader(leaderActorContext);
950             leader.markFollowerActive(followerActor.path().toString());
951
952             leader.handleMessage(leaderActor, new SendHeartBeat());
953
954             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
955                     .getFirstMatching(followerActor, AppendEntries.class);
956
957             assertNotNull(appendEntries);
958
959             assertEquals(1, appendEntries.getLeaderCommit());
960             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
961             assertEquals(0, appendEntries.getPrevLogIndex());
962
963             AppendEntriesReply appendEntriesReply =
964                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
965                     leaderActor, AppendEntriesReply.class);
966
967             assertNotNull(appendEntriesReply);
968
969             assertEquals(2, appendEntriesReply.getLogLastIndex());
970             assertEquals(1, appendEntriesReply.getLogLastTerm());
971
972         }};
973     }
974
975     @Test
976     public void testHandleAppendEntriesReplyFailure(){
977         new JavaTestKit(getSystem()) {
978             {
979
980                 ActorRef leaderActor =
981                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
982
983                 ActorRef followerActor =
984                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
985
986
987                 MockRaftActorContext leaderActorContext =
988                     new MockRaftActorContext("leader", getSystem(), leaderActor);
989
990                 Map<String, String> peerAddresses = new HashMap<>();
991                 peerAddresses.put("follower-1",
992                     followerActor.path().toString());
993
994                 leaderActorContext.setPeerAddresses(peerAddresses);
995
996                 Leader leader = new Leader(leaderActorContext);
997
998                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
999
1000                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1001
1002                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1003
1004             }};
1005     }
1006
1007     @Test
1008     public void testHandleAppendEntriesReplySuccess() throws Exception {
1009         new JavaTestKit(getSystem()) {
1010             {
1011
1012                 ActorRef leaderActor =
1013                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1014
1015                 ActorRef followerActor =
1016                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1017
1018
1019                 MockRaftActorContext leaderActorContext =
1020                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1021
1022                 leaderActorContext.setReplicatedLog(
1023                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1024
1025                 Map<String, String> peerAddresses = new HashMap<>();
1026                 peerAddresses.put("follower-1",
1027                     followerActor.path().toString());
1028
1029                 leaderActorContext.setPeerAddresses(peerAddresses);
1030                 leaderActorContext.setCommitIndex(1);
1031                 leaderActorContext.setLastApplied(1);
1032                 leaderActorContext.getTermInformation().update(1, "leader");
1033
1034                 Leader leader = new Leader(leaderActorContext);
1035
1036                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1037
1038                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1039
1040                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1041
1042                 assertEquals(2, leaderActorContext.getCommitIndex());
1043
1044                 ApplyLogEntries applyLogEntries =
1045                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1046                         ApplyLogEntries.class);
1047
1048                 assertNotNull(applyLogEntries);
1049
1050                 assertEquals(2, leaderActorContext.getLastApplied());
1051
1052                 assertEquals(2, applyLogEntries.getToIndex());
1053
1054                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1055                     ApplyState.class);
1056
1057                 assertEquals(1,applyStateList.size());
1058
1059                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1060
1061                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1062
1063             }};
1064     }
1065
1066     @Test
1067     public void testHandleAppendEntriesReplyUnknownFollower(){
1068         new JavaTestKit(getSystem()) {
1069             {
1070
1071                 ActorRef leaderActor =
1072                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1073
1074                 MockRaftActorContext leaderActorContext =
1075                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1076
1077                 Leader leader = new Leader(leaderActorContext);
1078
1079                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1080
1081                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1082
1083                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1084
1085             }};
1086     }
1087
1088     @Test
1089     public void testHandleRequestVoteReply(){
1090         new JavaTestKit(getSystem()) {
1091             {
1092
1093                 ActorRef leaderActor =
1094                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1095
1096                 MockRaftActorContext leaderActorContext =
1097                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1098
1099                 Leader leader = new Leader(leaderActorContext);
1100
1101                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1102
1103                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1104
1105                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1106
1107                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1108             }};
1109     }
1110
1111     @Test
1112     public void testIsolatedLeaderCheckNoFollowers() {
1113         new JavaTestKit(getSystem()) {{
1114             ActorRef leaderActor = getTestActor();
1115
1116             MockRaftActorContext leaderActorContext =
1117                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1118
1119             Map<String, String> peerAddresses = new HashMap<>();
1120             leaderActorContext.setPeerAddresses(peerAddresses);
1121
1122             Leader leader = new Leader(leaderActorContext);
1123             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1124             Assert.assertTrue(behavior instanceof Leader);
1125         }};
1126     }
1127
1128     @Test
1129     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1130         new JavaTestKit(getSystem()) {{
1131
1132             ActorRef followerActor1 = getTestActor();
1133             ActorRef followerActor2 = getTestActor();
1134
1135             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1136
1137             Map<String, String> peerAddresses = new HashMap<>();
1138             peerAddresses.put("follower-1", followerActor1.path().toString());
1139             peerAddresses.put("follower-2", followerActor2.path().toString());
1140
1141             leaderActorContext.setPeerAddresses(peerAddresses);
1142
1143             Leader leader = new Leader(leaderActorContext);
1144             leader.stopIsolatedLeaderCheckSchedule();
1145
1146             leader.markFollowerActive("follower-1");
1147             leader.markFollowerActive("follower-2");
1148             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1149             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1150                 behavior instanceof Leader);
1151
1152             // kill 1 follower and verify if that got killed
1153             final JavaTestKit probe = new JavaTestKit(getSystem());
1154             probe.watch(followerActor1);
1155             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1156             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1157             assertEquals(termMsg1.getActor(), followerActor1);
1158
1159             leader.markFollowerInActive("follower-1");
1160             leader.markFollowerActive("follower-2");
1161             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1162             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1163                 behavior instanceof Leader);
1164
1165             // kill 2nd follower and leader should change to Isolated leader
1166             followerActor2.tell(PoisonPill.getInstance(), null);
1167             probe.watch(followerActor2);
1168             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1169             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1170             assertEquals(termMsg2.getActor(), followerActor2);
1171
1172             leader.markFollowerInActive("follower-2");
1173             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1174             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1175                 behavior instanceof IsolatedLeader);
1176
1177         }};
1178     }
1179
1180     class MockLeader extends Leader {
1181
1182         FollowerToSnapshot fts;
1183
1184         public MockLeader(RaftActorContext context){
1185             super(context);
1186         }
1187
1188         public FollowerToSnapshot getFollowerToSnapshot() {
1189             return fts;
1190         }
1191
1192         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1193             fts = new FollowerToSnapshot(bs);
1194             setFollowerSnapshot(followerId, fts);
1195         }
1196     }
1197
1198     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1199
1200         private final long electionTimeOutIntervalMillis;
1201         private final int snapshotChunkSize;
1202
1203         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1204             super();
1205             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1206             this.snapshotChunkSize = snapshotChunkSize;
1207         }
1208
1209         @Override
1210         public FiniteDuration getElectionTimeOutInterval() {
1211             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1212         }
1213
1214         @Override
1215         public int getSnapshotChunkSize() {
1216             return snapshotChunkSize;
1217         }
1218     }
1219 }