Merge changes I880310f2,I9f437328,I552372db,I587fb203,I05f0bd94, ...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     private ActorRef leaderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54     private ActorRef senderActor =
55         getSystem().actorOf(Props.create(DoNothingActor.class));
56
57     @Test
58     public void testHandleMessageForUnknownMessage() throws Exception {
59         new JavaTestKit(getSystem()) {{
60             Leader leader =
61                 new Leader(createActorContext());
62
63             // handle message should return the Leader state when it receives an
64             // unknown message
65             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66             Assert.assertTrue(behavior instanceof Leader);
67         }};
68     }
69
70     @Test
71     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72         new JavaTestKit(getSystem()) {{
73
74             new Within(duration("1 seconds")) {
75                 @Override
76                 protected void run() {
77
78                     ActorRef followerActor = getTestActor();
79
80                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81
82                     Map<String, String> peerAddresses = new HashMap<>();
83
84                     peerAddresses.put(followerActor.path().toString(),
85                         followerActor.path().toString());
86
87                     actorContext.setPeerAddresses(peerAddresses);
88
89                     Leader leader = new Leader(actorContext);
90                     leader.handleMessage(senderActor, new SendHeartBeat());
91
92                     final String out =
93                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94                             // do not put code outside this method, will run afterwards
95                             @Override
96                             protected String match(Object in) {
97                                 Object msg = fromSerializableMessage(in);
98                                 if (msg instanceof AppendEntries) {
99                                     if (((AppendEntries)msg).getTerm() == 0) {
100                                         return "match";
101                                     }
102                                     return null;
103                                 } else {
104                                     throw noMatch();
105                                 }
106                             }
107                         }.get(); // this extracts the received message
108
109                     assertEquals("match", out);
110
111                 }
112             };
113         }};
114     }
115
116     @Test
117     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
118         new JavaTestKit(getSystem()) {{
119
120             new Within(duration("1 seconds")) {
121                 @Override
122                 protected void run() {
123
124                     ActorRef followerActor = getTestActor();
125
126                     MockRaftActorContext actorContext =
127                         (MockRaftActorContext) createActorContext();
128
129                     Map<String, String> peerAddresses = new HashMap<>();
130
131                     peerAddresses.put(followerActor.path().toString(),
132                             followerActor.path().toString());
133
134                     actorContext.setPeerAddresses(peerAddresses);
135
136                     Leader leader = new Leader(actorContext);
137                     RaftActorBehavior raftBehavior = leader
138                         .handleMessage(senderActor, new Replicate(null, null,
139                             new MockRaftActorContext.MockReplicatedLogEntry(1,
140                                 100,
141                                 new MockRaftActorContext.MockPayload("foo"))
142                         ));
143
144                     // State should not change
145                     assertTrue(raftBehavior instanceof Leader);
146
147                     final String out =
148                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
149                             // do not put code outside this method, will run afterwards
150                             @Override
151                             protected String match(Object in) {
152                                 Object msg = fromSerializableMessage(in);
153                                 if (msg instanceof AppendEntries) {
154                                     if (((AppendEntries)msg).getTerm() == 0) {
155                                         return "match";
156                                     }
157                                     return null;
158                                 } else {
159                                     throw noMatch();
160                                 }
161                             }
162                         }.get(); // this extracts the received message
163
164                     assertEquals("match", out);
165                 }
166             };
167         }};
168     }
169
170     @Test
171     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
172         new JavaTestKit(getSystem()) {{
173
174             new Within(duration("1 seconds")) {
175                 @Override
176                 protected void run() {
177
178                     ActorRef raftActor = getTestActor();
179
180                     MockRaftActorContext actorContext =
181                         new MockRaftActorContext("test", getSystem(), raftActor);
182
183                     actorContext.getReplicatedLog().removeFrom(0);
184
185                     actorContext.setReplicatedLog(
186                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
187                             .build());
188
189                     Leader leader = new Leader(actorContext);
190                     RaftActorBehavior raftBehavior = leader
191                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
192
193                     // State should not change
194                     assertTrue(raftBehavior instanceof Leader);
195
196                     assertEquals(1, actorContext.getCommitIndex());
197
198                     final String out =
199                         new ExpectMsg<String>(duration("1 seconds"),
200                             "match hint") {
201                             // do not put code outside this method, will run afterwards
202                             @Override
203                             protected String match(Object in) {
204                                 if (in instanceof ApplyState) {
205                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
206                                         return "match";
207                                     }
208                                     return null;
209                                 } else {
210                                     throw noMatch();
211                                 }
212                             }
213                         }.get(); // this extracts the received message
214
215                     assertEquals("match", out);
216
217                 }
218             };
219         }};
220     }
221
222     @Test
223     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
224         new JavaTestKit(getSystem()) {{
225             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
226
227             Map<String, String> peerAddresses = new HashMap<>();
228             peerAddresses.put(followerActor.path().toString(),
229                 followerActor.path().toString());
230
231             MockRaftActorContext actorContext =
232                 (MockRaftActorContext) createActorContext(leaderActor);
233             actorContext.setPeerAddresses(peerAddresses);
234
235             Map<String, String> leadersSnapshot = new HashMap<>();
236             leadersSnapshot.put("1", "A");
237             leadersSnapshot.put("2", "B");
238             leadersSnapshot.put("3", "C");
239
240             //clears leaders log
241             actorContext.getReplicatedLog().removeFrom(0);
242
243             final int followersLastIndex = 2;
244             final int snapshotIndex = 3;
245             final int newEntryIndex = 4;
246             final int snapshotTerm = 1;
247             final int currentTerm = 2;
248
249             // set the snapshot variables in replicatedlog
250             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
252             actorContext.setCommitIndex(followersLastIndex);
253             //set follower timeout to 2 mins, helps during debugging
254             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
255
256             MockLeader leader = new MockLeader(actorContext);
257
258             // new entry
259             ReplicatedLogImplEntry entry =
260                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261                     new MockRaftActorContext.MockPayload("D"));
262
263             //update follower timestamp
264             leader.markFollowerActive(followerActor.path().toString());
265
266             ByteString bs = toByteString(leadersSnapshot);
267             leader.setSnapshot(Optional.of(bs));
268             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
269
270             //send first chunk and no InstallSnapshotReply received yet
271             leader.getFollowerToSnapshot().getNextChunk();
272             leader.getFollowerToSnapshot().incrementChunkIndex();
273
274             leader.handleMessage(leaderActor, new SendHeartBeat());
275
276             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
277                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
278
279             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
280                 "received", aeproto);
281
282             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
283
284             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
285
286             //InstallSnapshotReply received
287             leader.getFollowerToSnapshot().markSendStatus(true);
288
289             leader.handleMessage(senderActor, new SendHeartBeat());
290
291             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
292                 MessageCollectorActor.getFirstMatching(followerActor,
293                     InstallSnapshot.SERIALIZABLE_CLASS);
294
295             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
296                 isproto);
297
298             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
299
300             assertEquals(snapshotIndex, is.getLastIncludedIndex());
301
302         }};
303     }
304
305     @Test
306     public void testSendAppendEntriesSnapshotScenario() {
307         new JavaTestKit(getSystem()) {{
308
309             ActorRef followerActor = getTestActor();
310
311             Map<String, String> peerAddresses = new HashMap<>();
312             peerAddresses.put(followerActor.path().toString(),
313                 followerActor.path().toString());
314
315             MockRaftActorContext actorContext =
316                 (MockRaftActorContext) createActorContext(getRef());
317             actorContext.setPeerAddresses(peerAddresses);
318
319             Map<String, String> leadersSnapshot = new HashMap<>();
320             leadersSnapshot.put("1", "A");
321             leadersSnapshot.put("2", "B");
322             leadersSnapshot.put("3", "C");
323
324             //clears leaders log
325             actorContext.getReplicatedLog().removeFrom(0);
326
327             final int followersLastIndex = 2;
328             final int snapshotIndex = 3;
329             final int newEntryIndex = 4;
330             final int snapshotTerm = 1;
331             final int currentTerm = 2;
332
333             // set the snapshot variables in replicatedlog
334             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
335             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
336             actorContext.setCommitIndex(followersLastIndex);
337
338             Leader leader = new Leader(actorContext);
339
340             // new entry
341             ReplicatedLogImplEntry entry =
342                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
343                     new MockRaftActorContext.MockPayload("D"));
344
345             //update follower timestamp
346             leader.markFollowerActive(followerActor.path().toString());
347
348             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
349             RaftActorBehavior raftBehavior = leader.handleMessage(
350                 senderActor, new Replicate(null, "state-id", entry));
351
352             assertTrue(raftBehavior instanceof Leader);
353
354             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
355             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
356                 @Override
357                 protected Boolean match(Object o) throws Exception {
358                     if (o instanceof InitiateInstallSnapshot) {
359                         return true;
360                     }
361                     return false;
362                 }
363             }.get();
364
365             boolean initiateInitiateInstallSnapshot = false;
366             for (Boolean b: matches) {
367                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
368             }
369
370             assertTrue(initiateInitiateInstallSnapshot);
371         }};
372     }
373
374     @Test
375     public void testInitiateInstallSnapshot() throws Exception {
376         new JavaTestKit(getSystem()) {{
377
378             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
379
380             ActorRef followerActor = getTestActor();
381
382             Map<String, String> peerAddresses = new HashMap<>();
383             peerAddresses.put(followerActor.path().toString(),
384                 followerActor.path().toString());
385
386
387             MockRaftActorContext actorContext =
388                 (MockRaftActorContext) createActorContext(leaderActor);
389             actorContext.setPeerAddresses(peerAddresses);
390
391             Map<String, String> leadersSnapshot = new HashMap<>();
392             leadersSnapshot.put("1", "A");
393             leadersSnapshot.put("2", "B");
394             leadersSnapshot.put("3", "C");
395
396             //clears leaders log
397             actorContext.getReplicatedLog().removeFrom(0);
398
399             final int followersLastIndex = 2;
400             final int snapshotIndex = 3;
401             final int newEntryIndex = 4;
402             final int snapshotTerm = 1;
403             final int currentTerm = 2;
404
405             // set the snapshot variables in replicatedlog
406             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
407             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
408             actorContext.setLastApplied(3);
409             actorContext.setCommitIndex(followersLastIndex);
410
411             Leader leader = new Leader(actorContext);
412             // set the snapshot as absent and check if capture-snapshot is invoked.
413             leader.setSnapshot(Optional.<ByteString>absent());
414
415             // new entry
416             ReplicatedLogImplEntry entry =
417                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
418                     new MockRaftActorContext.MockPayload("D"));
419
420             actorContext.getReplicatedLog().append(entry);
421
422             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
423             RaftActorBehavior raftBehavior = leader.handleMessage(
424                 leaderActor, new InitiateInstallSnapshot());
425
426             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
427                 getFirstMatching(leaderActor, CaptureSnapshot.class);
428
429             assertNotNull(cs);
430
431             assertTrue(cs.isInstallSnapshotInitiated());
432             assertEquals(3, cs.getLastAppliedIndex());
433             assertEquals(1, cs.getLastAppliedTerm());
434             assertEquals(4, cs.getLastIndex());
435             assertEquals(2, cs.getLastTerm());
436         }};
437     }
438
439     @Test
440     public void testInstallSnapshot() {
441         new JavaTestKit(getSystem()) {{
442
443             ActorRef followerActor = getTestActor();
444
445             Map<String, String> peerAddresses = new HashMap<>();
446             peerAddresses.put(followerActor.path().toString(),
447                 followerActor.path().toString());
448
449             MockRaftActorContext actorContext =
450                 (MockRaftActorContext) createActorContext();
451             actorContext.setPeerAddresses(peerAddresses);
452
453
454             Map<String, String> leadersSnapshot = new HashMap<>();
455             leadersSnapshot.put("1", "A");
456             leadersSnapshot.put("2", "B");
457             leadersSnapshot.put("3", "C");
458
459             //clears leaders log
460             actorContext.getReplicatedLog().removeFrom(0);
461
462             final int followersLastIndex = 2;
463             final int snapshotIndex = 3;
464             final int newEntryIndex = 4;
465             final int snapshotTerm = 1;
466             final int currentTerm = 2;
467
468             // set the snapshot variables in replicatedlog
469             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
470             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
471             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
472             actorContext.setCommitIndex(followersLastIndex);
473
474             Leader leader = new Leader(actorContext);
475
476             // new entry
477             ReplicatedLogImplEntry entry =
478                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
479                     new MockRaftActorContext.MockPayload("D"));
480
481             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
482                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
483
484             assertTrue(raftBehavior instanceof Leader);
485
486             // check if installsnapshot gets called with the correct values.
487             final String out =
488                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
489                     // do not put code outside this method, will run afterwards
490                     @Override
491                     protected String match(Object in) {
492                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
493                             InstallSnapshot is = (InstallSnapshot)
494                                 SerializationUtils.fromSerializable(in);
495                             if (is.getData() == null) {
496                                 return "InstallSnapshot data is null";
497                             }
498                             if (is.getLastIncludedIndex() != snapshotIndex) {
499                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
500                             }
501                             if (is.getLastIncludedTerm() != snapshotTerm) {
502                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
503                             }
504                             if (is.getTerm() == currentTerm) {
505                                 return is.getTerm() + "!=" + currentTerm;
506                             }
507
508                             return "match";
509
510                         } else {
511                             return "message mismatch:" + in.getClass();
512                         }
513                     }
514                 }.get(); // this extracts the received message
515
516             assertEquals("match", out);
517         }};
518     }
519
520     @Test
521     public void testHandleInstallSnapshotReplyLastChunk() {
522         new JavaTestKit(getSystem()) {{
523
524             ActorRef followerActor = getTestActor();
525
526             Map<String, String> peerAddresses = new HashMap<>();
527             peerAddresses.put(followerActor.path().toString(),
528                 followerActor.path().toString());
529
530             final int followersLastIndex = 2;
531             final int snapshotIndex = 3;
532             final int newEntryIndex = 4;
533             final int snapshotTerm = 1;
534             final int currentTerm = 2;
535
536             MockRaftActorContext actorContext =
537                 (MockRaftActorContext) createActorContext();
538             actorContext.setPeerAddresses(peerAddresses);
539             actorContext.setCommitIndex(followersLastIndex);
540
541             MockLeader leader = new MockLeader(actorContext);
542
543             Map<String, String> leadersSnapshot = new HashMap<>();
544             leadersSnapshot.put("1", "A");
545             leadersSnapshot.put("2", "B");
546             leadersSnapshot.put("3", "C");
547
548             // set the snapshot variables in replicatedlog
549
550             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
551             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
552             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
553
554             ByteString bs = toByteString(leadersSnapshot);
555             leader.setSnapshot(Optional.of(bs));
556             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
557             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
558                 leader.getFollowerToSnapshot().getNextChunk();
559                 leader.getFollowerToSnapshot().incrementChunkIndex();
560             }
561
562             //clears leaders log
563             actorContext.getReplicatedLog().removeFrom(0);
564
565             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
566                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
567                     leader.getFollowerToSnapshot().getChunkIndex(), true));
568
569             assertTrue(raftBehavior instanceof Leader);
570
571             assertEquals(0, leader.followerSnapshotSize());
572             assertEquals(1, leader.followerLogSize());
573             assertNotNull(leader.getFollower(followerActor.path().toString()));
574             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
575             assertEquals(snapshotIndex, fli.getMatchIndex());
576             assertEquals(snapshotIndex, fli.getMatchIndex());
577             assertEquals(snapshotIndex + 1, fli.getNextIndex());
578         }};
579     }
580
581     @Test
582     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
583         new JavaTestKit(getSystem()) {{
584
585             TestActorRef<MessageCollectorActor> followerActor =
586                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
587
588             Map<String, String> peerAddresses = new HashMap<>();
589             peerAddresses.put(followerActor.path().toString(),
590                     followerActor.path().toString());
591
592             final int followersLastIndex = 2;
593             final int snapshotIndex = 3;
594             final int snapshotTerm = 1;
595             final int currentTerm = 2;
596
597             MockRaftActorContext actorContext =
598                     (MockRaftActorContext) createActorContext();
599
600             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
601                 @Override
602                 public int getSnapshotChunkSize() {
603                     return 50;
604                 }
605             });
606             actorContext.setPeerAddresses(peerAddresses);
607             actorContext.setCommitIndex(followersLastIndex);
608
609             MockLeader leader = new MockLeader(actorContext);
610
611             Map<String, String> leadersSnapshot = new HashMap<>();
612             leadersSnapshot.put("1", "A");
613             leadersSnapshot.put("2", "B");
614             leadersSnapshot.put("3", "C");
615
616             // set the snapshot variables in replicatedlog
617             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
618             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
619             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
620
621             ByteString bs = toByteString(leadersSnapshot);
622             leader.setSnapshot(Optional.of(bs));
623
624             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
625
626             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
627
628             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
629
630             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
631
632             assertEquals(1, installSnapshot.getChunkIndex());
633             assertEquals(3, installSnapshot.getTotalChunks());
634
635
636             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
637
638             leader.handleMessage(leaderActor, new SendHeartBeat());
639
640             o = MessageCollectorActor.getAllMessages(followerActor).get(1);
641
642             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
643
644             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
645
646             assertEquals(1, installSnapshot.getChunkIndex());
647             assertEquals(3, installSnapshot.getTotalChunks());
648
649             followerActor.tell(PoisonPill.getInstance(), getRef());
650         }};
651     }
652
653     @Test
654     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
655         new JavaTestKit(getSystem()) {
656             {
657
658                 TestActorRef<MessageCollectorActor> followerActor =
659                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
660
661                 Map<String, String> peerAddresses = new HashMap<>();
662                 peerAddresses.put(followerActor.path().toString(),
663                         followerActor.path().toString());
664
665                 final int followersLastIndex = 2;
666                 final int snapshotIndex = 3;
667                 final int snapshotTerm = 1;
668                 final int currentTerm = 2;
669
670                 MockRaftActorContext actorContext =
671                         (MockRaftActorContext) createActorContext();
672
673                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
674                     @Override
675                     public int getSnapshotChunkSize() {
676                         return 50;
677                     }
678                 });
679                 actorContext.setPeerAddresses(peerAddresses);
680                 actorContext.setCommitIndex(followersLastIndex);
681
682                 MockLeader leader = new MockLeader(actorContext);
683
684                 Map<String, String> leadersSnapshot = new HashMap<>();
685                 leadersSnapshot.put("1", "A");
686                 leadersSnapshot.put("2", "B");
687                 leadersSnapshot.put("3", "C");
688
689                 // set the snapshot variables in replicatedlog
690                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
691                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
692                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
693
694                 ByteString bs = toByteString(leadersSnapshot);
695                 leader.setSnapshot(Optional.of(bs));
696
697                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
698
699                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
700
701                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
702
703                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
704
705                 assertEquals(1, installSnapshot.getChunkIndex());
706                 assertEquals(3, installSnapshot.getTotalChunks());
707                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
708
709                 int hashCode = installSnapshot.getData().hashCode();
710
711                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
712
713                 leader.handleMessage(leaderActor, new SendHeartBeat());
714
715                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
716
717                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
718
719                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
720
721                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
722
723                 assertEquals(2, installSnapshot.getChunkIndex());
724                 assertEquals(3, installSnapshot.getTotalChunks());
725                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
726
727                 followerActor.tell(PoisonPill.getInstance(), getRef());
728             }};
729     }
730
731     @Test
732     public void testFollowerToSnapshotLogic() {
733
734         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
735
736         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
737             @Override
738             public int getSnapshotChunkSize() {
739                 return 50;
740             }
741         });
742
743         MockLeader leader = new MockLeader(actorContext);
744
745         Map<String, String> leadersSnapshot = new HashMap<>();
746         leadersSnapshot.put("1", "A");
747         leadersSnapshot.put("2", "B");
748         leadersSnapshot.put("3", "C");
749
750         ByteString bs = toByteString(leadersSnapshot);
751         byte[] barray = bs.toByteArray();
752
753         leader.createFollowerToSnapshot("followerId", bs);
754         assertEquals(bs.size(), barray.length);
755
756         int chunkIndex=0;
757         for (int i=0; i < barray.length; i = i + 50) {
758             int j = i + 50;
759             chunkIndex++;
760
761             if (i + 50 > barray.length) {
762                 j = barray.length;
763             }
764
765             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
766             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
767             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
768
769             leader.getFollowerToSnapshot().markSendStatus(true);
770             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
771                 leader.getFollowerToSnapshot().incrementChunkIndex();
772             }
773         }
774
775         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
776     }
777
778
779     @Override protected RaftActorBehavior createBehavior(
780         RaftActorContext actorContext) {
781         return new Leader(actorContext);
782     }
783
784     @Override protected RaftActorContext createActorContext() {
785         return createActorContext(leaderActor);
786     }
787
788     @Override
789     protected RaftActorContext createActorContext(ActorRef actorRef) {
790         return new MockRaftActorContext("test", getSystem(), actorRef);
791     }
792
793     private ByteString toByteString(Map<String, String> state) {
794         ByteArrayOutputStream b = null;
795         ObjectOutputStream o = null;
796         try {
797             try {
798                 b = new ByteArrayOutputStream();
799                 o = new ObjectOutputStream(b);
800                 o.writeObject(state);
801                 byte[] snapshotBytes = b.toByteArray();
802                 return ByteString.copyFrom(snapshotBytes);
803             } finally {
804                 if (o != null) {
805                     o.flush();
806                     o.close();
807                 }
808                 if (b != null) {
809                     b.close();
810                 }
811             }
812         } catch (IOException e) {
813             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
814         }
815         return null;
816     }
817
818     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
819         private static AbstractRaftActorBehavior behavior;
820
821         public ForwardMessageToBehaviorActor(){
822
823         }
824
825         @Override public void onReceive(Object message) throws Exception {
826             super.onReceive(message);
827             behavior.handleMessage(sender(), message);
828         }
829
830         public static void setBehavior(AbstractRaftActorBehavior behavior){
831             ForwardMessageToBehaviorActor.behavior = behavior;
832         }
833     }
834
835     @Test
836     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
837         new JavaTestKit(getSystem()) {{
838
839             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
840
841             MockRaftActorContext leaderActorContext =
842                 new MockRaftActorContext("leader", getSystem(), leaderActor);
843
844             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
845
846             MockRaftActorContext followerActorContext =
847                 new MockRaftActorContext("follower", getSystem(), followerActor);
848
849             Follower follower = new Follower(followerActorContext);
850
851             ForwardMessageToBehaviorActor.setBehavior(follower);
852
853             Map<String, String> peerAddresses = new HashMap<>();
854             peerAddresses.put(followerActor.path().toString(),
855                 followerActor.path().toString());
856
857             leaderActorContext.setPeerAddresses(peerAddresses);
858
859             leaderActorContext.getReplicatedLog().removeFrom(0);
860
861             //create 3 entries
862             leaderActorContext.setReplicatedLog(
863                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
864
865             leaderActorContext.setCommitIndex(1);
866
867             followerActorContext.getReplicatedLog().removeFrom(0);
868
869             // follower too has the exact same log entries and has the same commit index
870             followerActorContext.setReplicatedLog(
871                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
872
873             followerActorContext.setCommitIndex(1);
874
875             Leader leader = new Leader(leaderActorContext);
876             leader.markFollowerActive(followerActor.path().toString());
877
878             leader.handleMessage(leaderActor, new SendHeartBeat());
879
880             AppendEntriesMessages.AppendEntries appendEntries =
881                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
882                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
883
884             assertNotNull(appendEntries);
885
886             assertEquals(1, appendEntries.getLeaderCommit());
887             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
888             assertEquals(0, appendEntries.getPrevLogIndex());
889
890             AppendEntriesReply appendEntriesReply =
891                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
892                     leaderActor, AppendEntriesReply.class);
893
894             assertNotNull(appendEntriesReply);
895
896             // follower returns its next index
897             assertEquals(2, appendEntriesReply.getLogLastIndex());
898             assertEquals(1, appendEntriesReply.getLogLastTerm());
899
900         }};
901     }
902
903
904     @Test
905     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
906         new JavaTestKit(getSystem()) {{
907
908             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
909
910             MockRaftActorContext leaderActorContext =
911                 new MockRaftActorContext("leader", getSystem(), leaderActor);
912
913             ActorRef followerActor = getSystem().actorOf(
914                 Props.create(ForwardMessageToBehaviorActor.class));
915
916             MockRaftActorContext followerActorContext =
917                 new MockRaftActorContext("follower", getSystem(), followerActor);
918
919             Follower follower = new Follower(followerActorContext);
920
921             ForwardMessageToBehaviorActor.setBehavior(follower);
922
923             Map<String, String> peerAddresses = new HashMap<>();
924             peerAddresses.put(followerActor.path().toString(),
925                 followerActor.path().toString());
926
927             leaderActorContext.setPeerAddresses(peerAddresses);
928
929             leaderActorContext.getReplicatedLog().removeFrom(0);
930
931             leaderActorContext.setReplicatedLog(
932                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
933
934             leaderActorContext.setCommitIndex(1);
935
936             followerActorContext.getReplicatedLog().removeFrom(0);
937
938             followerActorContext.setReplicatedLog(
939                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
940
941             // follower has the same log entries but its commit index > leaders commit index
942             followerActorContext.setCommitIndex(2);
943
944             Leader leader = new Leader(leaderActorContext);
945             leader.markFollowerActive(followerActor.path().toString());
946
947             leader.handleMessage(leaderActor, new SendHeartBeat());
948
949             AppendEntriesMessages.AppendEntries appendEntries =
950                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
951                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
952
953             assertNotNull(appendEntries);
954
955             assertEquals(1, appendEntries.getLeaderCommit());
956             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
957             assertEquals(0, appendEntries.getPrevLogIndex());
958
959             AppendEntriesReply appendEntriesReply =
960                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
961                     leaderActor, AppendEntriesReply.class);
962
963             assertNotNull(appendEntriesReply);
964
965             assertEquals(2, appendEntriesReply.getLogLastIndex());
966             assertEquals(1, appendEntriesReply.getLogLastTerm());
967
968         }};
969     }
970
971     @Test
972     public void testHandleAppendEntriesReplyFailure(){
973         new JavaTestKit(getSystem()) {
974             {
975
976                 ActorRef leaderActor =
977                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
978
979                 ActorRef followerActor =
980                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
981
982
983                 MockRaftActorContext leaderActorContext =
984                     new MockRaftActorContext("leader", getSystem(), leaderActor);
985
986                 Map<String, String> peerAddresses = new HashMap<>();
987                 peerAddresses.put("follower-1",
988                     followerActor.path().toString());
989
990                 leaderActorContext.setPeerAddresses(peerAddresses);
991
992                 Leader leader = new Leader(leaderActorContext);
993
994                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
995
996                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
997
998                 assertEquals(RaftState.Leader, raftActorBehavior.state());
999
1000             }};
1001     }
1002
1003     @Test
1004     public void testHandleAppendEntriesReplySuccess() throws Exception {
1005         new JavaTestKit(getSystem()) {
1006             {
1007
1008                 ActorRef leaderActor =
1009                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1010
1011                 ActorRef followerActor =
1012                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1013
1014
1015                 MockRaftActorContext leaderActorContext =
1016                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1017
1018                 leaderActorContext.setReplicatedLog(
1019                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1020
1021                 Map<String, String> peerAddresses = new HashMap<>();
1022                 peerAddresses.put("follower-1",
1023                     followerActor.path().toString());
1024
1025                 leaderActorContext.setPeerAddresses(peerAddresses);
1026                 leaderActorContext.setCommitIndex(1);
1027                 leaderActorContext.setLastApplied(1);
1028                 leaderActorContext.getTermInformation().update(1, "leader");
1029
1030                 Leader leader = new Leader(leaderActorContext);
1031
1032                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1033
1034                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1035
1036                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1037
1038                 assertEquals(2, leaderActorContext.getCommitIndex());
1039
1040                 ApplyLogEntries applyLogEntries =
1041                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1042                         ApplyLogEntries.class);
1043
1044                 assertNotNull(applyLogEntries);
1045
1046                 assertEquals(2, leaderActorContext.getLastApplied());
1047
1048                 assertEquals(2, applyLogEntries.getToIndex());
1049
1050                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1051                     ApplyState.class);
1052
1053                 assertEquals(1,applyStateList.size());
1054
1055                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1056
1057                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1058
1059             }};
1060     }
1061
1062     @Test
1063     public void testHandleAppendEntriesReplyUnknownFollower(){
1064         new JavaTestKit(getSystem()) {
1065             {
1066
1067                 ActorRef leaderActor =
1068                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1069
1070                 MockRaftActorContext leaderActorContext =
1071                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1072
1073                 Leader leader = new Leader(leaderActorContext);
1074
1075                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1076
1077                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1078
1079                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1080
1081             }};
1082     }
1083
1084     @Test
1085     public void testHandleRequestVoteReply(){
1086         new JavaTestKit(getSystem()) {
1087             {
1088
1089                 ActorRef leaderActor =
1090                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1091
1092                 MockRaftActorContext leaderActorContext =
1093                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1094
1095                 Leader leader = new Leader(leaderActorContext);
1096
1097                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1098
1099                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1100
1101                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1102
1103                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1104             }};
1105     }
1106
1107     @Test
1108     public void testIsolatedLeaderCheckNoFollowers() {
1109         new JavaTestKit(getSystem()) {{
1110             ActorRef leaderActor = getTestActor();
1111
1112             MockRaftActorContext leaderActorContext =
1113                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1114
1115             Map<String, String> peerAddresses = new HashMap<>();
1116             leaderActorContext.setPeerAddresses(peerAddresses);
1117
1118             Leader leader = new Leader(leaderActorContext);
1119             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1120             Assert.assertTrue(behavior instanceof Leader);
1121         }};
1122     }
1123
1124     @Test
1125     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1126         new JavaTestKit(getSystem()) {{
1127
1128             ActorRef followerActor1 = getTestActor();
1129             ActorRef followerActor2 = getTestActor();
1130
1131             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1132
1133             Map<String, String> peerAddresses = new HashMap<>();
1134             peerAddresses.put("follower-1", followerActor1.path().toString());
1135             peerAddresses.put("follower-2", followerActor2.path().toString());
1136
1137             leaderActorContext.setPeerAddresses(peerAddresses);
1138
1139             Leader leader = new Leader(leaderActorContext);
1140             leader.stopIsolatedLeaderCheckSchedule();
1141
1142             leader.markFollowerActive("follower-1");
1143             leader.markFollowerActive("follower-2");
1144             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1145             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1146                 behavior instanceof Leader);
1147
1148             // kill 1 follower and verify if that got killed
1149             final JavaTestKit probe = new JavaTestKit(getSystem());
1150             probe.watch(followerActor1);
1151             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1152             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1153             assertEquals(termMsg1.getActor(), followerActor1);
1154
1155             leader.markFollowerInActive("follower-1");
1156             leader.markFollowerActive("follower-2");
1157             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1158             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1159                 behavior instanceof Leader);
1160
1161             // kill 2nd follower and leader should change to Isolated leader
1162             followerActor2.tell(PoisonPill.getInstance(), null);
1163             probe.watch(followerActor2);
1164             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1165             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1166             assertEquals(termMsg2.getActor(), followerActor2);
1167
1168             leader.markFollowerInActive("follower-2");
1169             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1170             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1171                 behavior instanceof IsolatedLeader);
1172
1173         }};
1174     }
1175
1176     class MockLeader extends Leader {
1177
1178         FollowerToSnapshot fts;
1179
1180         public MockLeader(RaftActorContext context){
1181             super(context);
1182         }
1183
1184         public FollowerToSnapshot getFollowerToSnapshot() {
1185             return fts;
1186         }
1187
1188         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1189             fts = new FollowerToSnapshot(bs);
1190             setFollowerSnapshot(followerId, fts);
1191         }
1192     }
1193
1194     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1195
1196         private long electionTimeOutIntervalMillis;
1197         private int snapshotChunkSize;
1198
1199         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1200             super();
1201             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1202             this.snapshotChunkSize = snapshotChunkSize;
1203         }
1204
1205         @Override
1206         public FiniteDuration getElectionTimeOutInterval() {
1207             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1208         }
1209
1210         @Override
1211         public int getSnapshotChunkSize() {
1212             return snapshotChunkSize;
1213         }
1214     }
1215 }