Hide AbstractLeader maps
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     private ActorRef leaderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54     private ActorRef senderActor =
55         getSystem().actorOf(Props.create(DoNothingActor.class));
56
57     @Test
58     public void testHandleMessageForUnknownMessage() throws Exception {
59         new JavaTestKit(getSystem()) {{
60             Leader leader =
61                 new Leader(createActorContext());
62
63             // handle message should return the Leader state when it receives an
64             // unknown message
65             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66             Assert.assertTrue(behavior instanceof Leader);
67         }};
68     }
69
70     @Test
71     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72         new JavaTestKit(getSystem()) {{
73
74             new Within(duration("1 seconds")) {
75                 protected void run() {
76
77                     ActorRef followerActor = getTestActor();
78
79                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80
81                     Map<String, String> peerAddresses = new HashMap<>();
82
83                     peerAddresses.put(followerActor.path().toString(),
84                         followerActor.path().toString());
85
86                     actorContext.setPeerAddresses(peerAddresses);
87
88                     Leader leader = new Leader(actorContext);
89                     leader.handleMessage(senderActor, new SendHeartBeat());
90
91                     final String out =
92                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93                             // do not put code outside this method, will run afterwards
94                             protected String match(Object in) {
95                                 Object msg = fromSerializableMessage(in);
96                                 if (msg instanceof AppendEntries) {
97                                     if (((AppendEntries)msg).getTerm() == 0) {
98                                         return "match";
99                                     }
100                                     return null;
101                                 } else {
102                                     throw noMatch();
103                                 }
104                             }
105                         }.get(); // this extracts the received message
106
107                     assertEquals("match", out);
108
109                 }
110             };
111         }};
112     }
113
114     @Test
115     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
116         new JavaTestKit(getSystem()) {{
117
118             new Within(duration("1 seconds")) {
119                 protected void run() {
120
121                     ActorRef followerActor = getTestActor();
122
123                     MockRaftActorContext actorContext =
124                         (MockRaftActorContext) createActorContext();
125
126                     Map<String, String> peerAddresses = new HashMap<>();
127
128                     peerAddresses.put(followerActor.path().toString(),
129                             followerActor.path().toString());
130
131                     actorContext.setPeerAddresses(peerAddresses);
132
133                     Leader leader = new Leader(actorContext);
134                     RaftActorBehavior raftBehavior = leader
135                         .handleMessage(senderActor, new Replicate(null, null,
136                             new MockRaftActorContext.MockReplicatedLogEntry(1,
137                                 100,
138                                 new MockRaftActorContext.MockPayload("foo"))
139                         ));
140
141                     // State should not change
142                     assertTrue(raftBehavior instanceof Leader);
143
144                     final String out =
145                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
146                             // do not put code outside this method, will run afterwards
147                             protected String match(Object in) {
148                                 Object msg = fromSerializableMessage(in);
149                                 if (msg instanceof AppendEntries) {
150                                     if (((AppendEntries)msg).getTerm() == 0) {
151                                         return "match";
152                                     }
153                                     return null;
154                                 } else {
155                                     throw noMatch();
156                                 }
157                             }
158                         }.get(); // this extracts the received message
159
160                     assertEquals("match", out);
161                 }
162             };
163         }};
164     }
165
166     @Test
167     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168         new JavaTestKit(getSystem()) {{
169
170             new Within(duration("1 seconds")) {
171                 protected void run() {
172
173                     ActorRef raftActor = getTestActor();
174
175                     MockRaftActorContext actorContext =
176                         new MockRaftActorContext("test", getSystem(), raftActor);
177
178                     actorContext.getReplicatedLog().removeFrom(0);
179
180                     actorContext.setReplicatedLog(
181                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
182                             .build());
183
184                     Leader leader = new Leader(actorContext);
185                     RaftActorBehavior raftBehavior = leader
186                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
187
188                     // State should not change
189                     assertTrue(raftBehavior instanceof Leader);
190
191                     assertEquals(1, actorContext.getCommitIndex());
192
193                     final String out =
194                         new ExpectMsg<String>(duration("1 seconds"),
195                             "match hint") {
196                             // do not put code outside this method, will run afterwards
197                             protected String match(Object in) {
198                                 if (in instanceof ApplyState) {
199                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
200                                         return "match";
201                                     }
202                                     return null;
203                                 } else {
204                                     throw noMatch();
205                                 }
206                             }
207                         }.get(); // this extracts the received message
208
209                     assertEquals("match", out);
210
211                 }
212             };
213         }};
214     }
215
216     @Test
217     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
218         new JavaTestKit(getSystem()) {{
219             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
220
221             Map<String, String> peerAddresses = new HashMap<>();
222             peerAddresses.put(followerActor.path().toString(),
223                 followerActor.path().toString());
224
225             MockRaftActorContext actorContext =
226                 (MockRaftActorContext) createActorContext(leaderActor);
227             actorContext.setPeerAddresses(peerAddresses);
228
229             Map<String, String> leadersSnapshot = new HashMap<>();
230             leadersSnapshot.put("1", "A");
231             leadersSnapshot.put("2", "B");
232             leadersSnapshot.put("3", "C");
233
234             //clears leaders log
235             actorContext.getReplicatedLog().removeFrom(0);
236
237             final int followersLastIndex = 2;
238             final int snapshotIndex = 3;
239             final int newEntryIndex = 4;
240             final int snapshotTerm = 1;
241             final int currentTerm = 2;
242
243             // set the snapshot variables in replicatedlog
244             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
245             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
246             actorContext.setCommitIndex(followersLastIndex);
247             //set follower timeout to 2 mins, helps during debugging
248             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
249
250             MockLeader leader = new MockLeader(actorContext);
251
252             // new entry
253             ReplicatedLogImplEntry entry =
254                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
255                     new MockRaftActorContext.MockPayload("D"));
256
257             //update follower timestamp
258             leader.markFollowerActive(followerActor.path().toString());
259
260             ByteString bs = toByteString(leadersSnapshot);
261             leader.setSnapshot(Optional.of(bs));
262             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
263
264             //send first chunk and no InstallSnapshotReply received yet
265             leader.getFollowerToSnapshot().getNextChunk();
266             leader.getFollowerToSnapshot().incrementChunkIndex();
267
268             leader.handleMessage(leaderActor, new SendHeartBeat());
269
270             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
271                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
272
273             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
274                 "received", aeproto);
275
276             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
277
278             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
279
280             //InstallSnapshotReply received
281             leader.getFollowerToSnapshot().markSendStatus(true);
282
283             leader.handleMessage(senderActor, new SendHeartBeat());
284
285             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
286                 MessageCollectorActor.getFirstMatching(followerActor,
287                     InstallSnapshot.SERIALIZABLE_CLASS);
288
289             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
290                 isproto);
291
292             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
293
294             assertEquals(snapshotIndex, is.getLastIncludedIndex());
295
296         }};
297     }
298
299     @Test
300     public void testSendAppendEntriesSnapshotScenario() {
301         new JavaTestKit(getSystem()) {{
302
303             ActorRef followerActor = getTestActor();
304
305             Map<String, String> peerAddresses = new HashMap<>();
306             peerAddresses.put(followerActor.path().toString(),
307                 followerActor.path().toString());
308
309             MockRaftActorContext actorContext =
310                 (MockRaftActorContext) createActorContext(getRef());
311             actorContext.setPeerAddresses(peerAddresses);
312
313             Map<String, String> leadersSnapshot = new HashMap<>();
314             leadersSnapshot.put("1", "A");
315             leadersSnapshot.put("2", "B");
316             leadersSnapshot.put("3", "C");
317
318             //clears leaders log
319             actorContext.getReplicatedLog().removeFrom(0);
320
321             final int followersLastIndex = 2;
322             final int snapshotIndex = 3;
323             final int newEntryIndex = 4;
324             final int snapshotTerm = 1;
325             final int currentTerm = 2;
326
327             // set the snapshot variables in replicatedlog
328             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
329             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
330             actorContext.setCommitIndex(followersLastIndex);
331
332             Leader leader = new Leader(actorContext);
333
334             // new entry
335             ReplicatedLogImplEntry entry =
336                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337                     new MockRaftActorContext.MockPayload("D"));
338
339             //update follower timestamp
340             leader.markFollowerActive(followerActor.path().toString());
341
342             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
343             RaftActorBehavior raftBehavior = leader.handleMessage(
344                 senderActor, new Replicate(null, "state-id", entry));
345
346             assertTrue(raftBehavior instanceof Leader);
347
348             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
349             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
350                 @Override
351                 protected Boolean match(Object o) throws Exception {
352                     if (o instanceof InitiateInstallSnapshot) {
353                         return true;
354                     }
355                     return false;
356                 }
357             }.get();
358
359             boolean initiateInitiateInstallSnapshot = false;
360             for (Boolean b: matches) {
361                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
362             }
363
364             assertTrue(initiateInitiateInstallSnapshot);
365         }};
366     }
367
368     @Test
369     public void testInitiateInstallSnapshot() throws Exception {
370         new JavaTestKit(getSystem()) {{
371
372             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
373
374             ActorRef followerActor = getTestActor();
375
376             Map<String, String> peerAddresses = new HashMap<>();
377             peerAddresses.put(followerActor.path().toString(),
378                 followerActor.path().toString());
379
380
381             MockRaftActorContext actorContext =
382                 (MockRaftActorContext) createActorContext(leaderActor);
383             actorContext.setPeerAddresses(peerAddresses);
384
385             Map<String, String> leadersSnapshot = new HashMap<>();
386             leadersSnapshot.put("1", "A");
387             leadersSnapshot.put("2", "B");
388             leadersSnapshot.put("3", "C");
389
390             //clears leaders log
391             actorContext.getReplicatedLog().removeFrom(0);
392
393             final int followersLastIndex = 2;
394             final int snapshotIndex = 3;
395             final int newEntryIndex = 4;
396             final int snapshotTerm = 1;
397             final int currentTerm = 2;
398
399             // set the snapshot variables in replicatedlog
400             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
401             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
402             actorContext.setLastApplied(3);
403             actorContext.setCommitIndex(followersLastIndex);
404
405             Leader leader = new Leader(actorContext);
406             // set the snapshot as absent and check if capture-snapshot is invoked.
407             leader.setSnapshot(Optional.<ByteString>absent());
408
409             // new entry
410             ReplicatedLogImplEntry entry =
411                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
412                     new MockRaftActorContext.MockPayload("D"));
413
414             actorContext.getReplicatedLog().append(entry);
415
416             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
417             RaftActorBehavior raftBehavior = leader.handleMessage(
418                 leaderActor, new InitiateInstallSnapshot());
419
420             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
421                 getFirstMatching(leaderActor, CaptureSnapshot.class);
422
423             assertNotNull(cs);
424
425             assertTrue(cs.isInstallSnapshotInitiated());
426             assertEquals(3, cs.getLastAppliedIndex());
427             assertEquals(1, cs.getLastAppliedTerm());
428             assertEquals(4, cs.getLastIndex());
429             assertEquals(2, cs.getLastTerm());
430         }};
431     }
432
433     @Test
434     public void testInstallSnapshot() {
435         new JavaTestKit(getSystem()) {{
436
437             ActorRef followerActor = getTestActor();
438
439             Map<String, String> peerAddresses = new HashMap<>();
440             peerAddresses.put(followerActor.path().toString(),
441                 followerActor.path().toString());
442
443             MockRaftActorContext actorContext =
444                 (MockRaftActorContext) createActorContext();
445             actorContext.setPeerAddresses(peerAddresses);
446
447
448             Map<String, String> leadersSnapshot = new HashMap<>();
449             leadersSnapshot.put("1", "A");
450             leadersSnapshot.put("2", "B");
451             leadersSnapshot.put("3", "C");
452
453             //clears leaders log
454             actorContext.getReplicatedLog().removeFrom(0);
455
456             final int followersLastIndex = 2;
457             final int snapshotIndex = 3;
458             final int newEntryIndex = 4;
459             final int snapshotTerm = 1;
460             final int currentTerm = 2;
461
462             // set the snapshot variables in replicatedlog
463             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
464             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
465             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
466             actorContext.setCommitIndex(followersLastIndex);
467
468             Leader leader = new Leader(actorContext);
469
470             // new entry
471             ReplicatedLogImplEntry entry =
472                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
473                     new MockRaftActorContext.MockPayload("D"));
474
475             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
476                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
477
478             assertTrue(raftBehavior instanceof Leader);
479
480             // check if installsnapshot gets called with the correct values.
481             final String out =
482                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
483                     // do not put code outside this method, will run afterwards
484                     protected String match(Object in) {
485                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
486                             InstallSnapshot is = (InstallSnapshot)
487                                 SerializationUtils.fromSerializable(in);
488                             if (is.getData() == null) {
489                                 return "InstallSnapshot data is null";
490                             }
491                             if (is.getLastIncludedIndex() != snapshotIndex) {
492                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
493                             }
494                             if (is.getLastIncludedTerm() != snapshotTerm) {
495                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
496                             }
497                             if (is.getTerm() == currentTerm) {
498                                 return is.getTerm() + "!=" + currentTerm;
499                             }
500
501                             return "match";
502
503                         } else {
504                             return "message mismatch:" + in.getClass();
505                         }
506                     }
507                 }.get(); // this extracts the received message
508
509             assertEquals("match", out);
510         }};
511     }
512
513     @Test
514     public void testHandleInstallSnapshotReplyLastChunk() {
515         new JavaTestKit(getSystem()) {{
516
517             ActorRef followerActor = getTestActor();
518
519             Map<String, String> peerAddresses = new HashMap<>();
520             peerAddresses.put(followerActor.path().toString(),
521                 followerActor.path().toString());
522
523             final int followersLastIndex = 2;
524             final int snapshotIndex = 3;
525             final int newEntryIndex = 4;
526             final int snapshotTerm = 1;
527             final int currentTerm = 2;
528
529             MockRaftActorContext actorContext =
530                 (MockRaftActorContext) createActorContext();
531             actorContext.setPeerAddresses(peerAddresses);
532             actorContext.setCommitIndex(followersLastIndex);
533
534             MockLeader leader = new MockLeader(actorContext);
535
536             Map<String, String> leadersSnapshot = new HashMap<>();
537             leadersSnapshot.put("1", "A");
538             leadersSnapshot.put("2", "B");
539             leadersSnapshot.put("3", "C");
540
541             // set the snapshot variables in replicatedlog
542
543             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
544             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
545             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
546
547             ByteString bs = toByteString(leadersSnapshot);
548             leader.setSnapshot(Optional.of(bs));
549             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
550             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
551                 leader.getFollowerToSnapshot().getNextChunk();
552                 leader.getFollowerToSnapshot().incrementChunkIndex();
553             }
554
555             //clears leaders log
556             actorContext.getReplicatedLog().removeFrom(0);
557
558             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
559                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
560                     leader.getFollowerToSnapshot().getChunkIndex(), true));
561
562             assertTrue(raftBehavior instanceof Leader);
563
564             assertEquals(0, leader.followerSnapshotSize());
565             assertEquals(1, leader.followerLogSize());
566             assertNotNull(leader.getFollower(followerActor.path().toString()));
567             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
568             assertEquals(snapshotIndex, fli.getMatchIndex().get());
569             assertEquals(snapshotIndex, fli.getMatchIndex().get());
570             assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
571         }};
572     }
573
574     @Test
575     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
576         new JavaTestKit(getSystem()) {{
577
578             TestActorRef<MessageCollectorActor> followerActor =
579                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
580
581             Map<String, String> peerAddresses = new HashMap<>();
582             peerAddresses.put(followerActor.path().toString(),
583                     followerActor.path().toString());
584
585             final int followersLastIndex = 2;
586             final int snapshotIndex = 3;
587             final int snapshotTerm = 1;
588             final int currentTerm = 2;
589
590             MockRaftActorContext actorContext =
591                     (MockRaftActorContext) createActorContext();
592
593             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
594                 @Override
595                 public int getSnapshotChunkSize() {
596                     return 50;
597                 }
598             });
599             actorContext.setPeerAddresses(peerAddresses);
600             actorContext.setCommitIndex(followersLastIndex);
601
602             MockLeader leader = new MockLeader(actorContext);
603
604             Map<String, String> leadersSnapshot = new HashMap<>();
605             leadersSnapshot.put("1", "A");
606             leadersSnapshot.put("2", "B");
607             leadersSnapshot.put("3", "C");
608
609             // set the snapshot variables in replicatedlog
610             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
611             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
612             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
613
614             ByteString bs = toByteString(leadersSnapshot);
615             leader.setSnapshot(Optional.of(bs));
616
617             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
618
619             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
620
621             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
622
623             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
624
625             assertEquals(1, installSnapshot.getChunkIndex());
626             assertEquals(3, installSnapshot.getTotalChunks());
627
628
629             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
630
631             leader.handleMessage(leaderActor, new SendHeartBeat());
632
633             o = MessageCollectorActor.getAllMessages(followerActor).get(1);
634
635             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
636
637             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
638
639             assertEquals(1, installSnapshot.getChunkIndex());
640             assertEquals(3, installSnapshot.getTotalChunks());
641
642             followerActor.tell(PoisonPill.getInstance(), getRef());
643         }};
644     }
645
646     @Test
647     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
648         new JavaTestKit(getSystem()) {
649             {
650
651                 TestActorRef<MessageCollectorActor> followerActor =
652                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
653
654                 Map<String, String> peerAddresses = new HashMap<>();
655                 peerAddresses.put(followerActor.path().toString(),
656                         followerActor.path().toString());
657
658                 final int followersLastIndex = 2;
659                 final int snapshotIndex = 3;
660                 final int snapshotTerm = 1;
661                 final int currentTerm = 2;
662
663                 MockRaftActorContext actorContext =
664                         (MockRaftActorContext) createActorContext();
665
666                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
667                     @Override
668                     public int getSnapshotChunkSize() {
669                         return 50;
670                     }
671                 });
672                 actorContext.setPeerAddresses(peerAddresses);
673                 actorContext.setCommitIndex(followersLastIndex);
674
675                 MockLeader leader = new MockLeader(actorContext);
676
677                 Map<String, String> leadersSnapshot = new HashMap<>();
678                 leadersSnapshot.put("1", "A");
679                 leadersSnapshot.put("2", "B");
680                 leadersSnapshot.put("3", "C");
681
682                 // set the snapshot variables in replicatedlog
683                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
684                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
685                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
686
687                 ByteString bs = toByteString(leadersSnapshot);
688                 leader.setSnapshot(Optional.of(bs));
689
690                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
691
692                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
693
694                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
695
696                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
697
698                 assertEquals(1, installSnapshot.getChunkIndex());
699                 assertEquals(3, installSnapshot.getTotalChunks());
700                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
701
702                 int hashCode = installSnapshot.getData().hashCode();
703
704                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
705
706                 leader.handleMessage(leaderActor, new SendHeartBeat());
707
708                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
709
710                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
711
712                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
713
714                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
715
716                 assertEquals(2, installSnapshot.getChunkIndex());
717                 assertEquals(3, installSnapshot.getTotalChunks());
718                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
719
720                 followerActor.tell(PoisonPill.getInstance(), getRef());
721             }};
722     }
723
724     @Test
725     public void testFollowerToSnapshotLogic() {
726
727         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
728
729         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
730             @Override
731             public int getSnapshotChunkSize() {
732                 return 50;
733             }
734         });
735
736         MockLeader leader = new MockLeader(actorContext);
737
738         Map<String, String> leadersSnapshot = new HashMap<>();
739         leadersSnapshot.put("1", "A");
740         leadersSnapshot.put("2", "B");
741         leadersSnapshot.put("3", "C");
742
743         ByteString bs = toByteString(leadersSnapshot);
744         byte[] barray = bs.toByteArray();
745
746         leader.createFollowerToSnapshot("followerId", bs);
747         assertEquals(bs.size(), barray.length);
748
749         int chunkIndex=0;
750         for (int i=0; i < barray.length; i = i + 50) {
751             int j = i + 50;
752             chunkIndex++;
753
754             if (i + 50 > barray.length) {
755                 j = barray.length;
756             }
757
758             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
759             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
760             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
761
762             leader.getFollowerToSnapshot().markSendStatus(true);
763             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
764                 leader.getFollowerToSnapshot().incrementChunkIndex();
765             }
766         }
767
768         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
769     }
770
771
772     @Override protected RaftActorBehavior createBehavior(
773         RaftActorContext actorContext) {
774         return new Leader(actorContext);
775     }
776
777     @Override protected RaftActorContext createActorContext() {
778         return createActorContext(leaderActor);
779     }
780
781     protected RaftActorContext createActorContext(ActorRef actorRef) {
782         return new MockRaftActorContext("test", getSystem(), actorRef);
783     }
784
785     private ByteString toByteString(Map<String, String> state) {
786         ByteArrayOutputStream b = null;
787         ObjectOutputStream o = null;
788         try {
789             try {
790                 b = new ByteArrayOutputStream();
791                 o = new ObjectOutputStream(b);
792                 o.writeObject(state);
793                 byte[] snapshotBytes = b.toByteArray();
794                 return ByteString.copyFrom(snapshotBytes);
795             } finally {
796                 if (o != null) {
797                     o.flush();
798                     o.close();
799                 }
800                 if (b != null) {
801                     b.close();
802                 }
803             }
804         } catch (IOException e) {
805             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
806         }
807         return null;
808     }
809
810     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
811         private static AbstractRaftActorBehavior behavior;
812
813         public ForwardMessageToBehaviorActor(){
814
815         }
816
817         @Override public void onReceive(Object message) throws Exception {
818             super.onReceive(message);
819             behavior.handleMessage(sender(), message);
820         }
821
822         public static void setBehavior(AbstractRaftActorBehavior behavior){
823             ForwardMessageToBehaviorActor.behavior = behavior;
824         }
825     }
826
827     @Test
828     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
829         new JavaTestKit(getSystem()) {{
830
831             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
832
833             MockRaftActorContext leaderActorContext =
834                 new MockRaftActorContext("leader", getSystem(), leaderActor);
835
836             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
837
838             MockRaftActorContext followerActorContext =
839                 new MockRaftActorContext("follower", getSystem(), followerActor);
840
841             Follower follower = new Follower(followerActorContext);
842
843             ForwardMessageToBehaviorActor.setBehavior(follower);
844
845             Map<String, String> peerAddresses = new HashMap<>();
846             peerAddresses.put(followerActor.path().toString(),
847                 followerActor.path().toString());
848
849             leaderActorContext.setPeerAddresses(peerAddresses);
850
851             leaderActorContext.getReplicatedLog().removeFrom(0);
852
853             //create 3 entries
854             leaderActorContext.setReplicatedLog(
855                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
856
857             leaderActorContext.setCommitIndex(1);
858
859             followerActorContext.getReplicatedLog().removeFrom(0);
860
861             // follower too has the exact same log entries and has the same commit index
862             followerActorContext.setReplicatedLog(
863                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
864
865             followerActorContext.setCommitIndex(1);
866
867             Leader leader = new Leader(leaderActorContext);
868             leader.markFollowerActive(followerActor.path().toString());
869
870             leader.handleMessage(leaderActor, new SendHeartBeat());
871
872             AppendEntriesMessages.AppendEntries appendEntries =
873                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
874                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
875
876             assertNotNull(appendEntries);
877
878             assertEquals(1, appendEntries.getLeaderCommit());
879             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
880             assertEquals(0, appendEntries.getPrevLogIndex());
881
882             AppendEntriesReply appendEntriesReply =
883                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
884                     leaderActor, AppendEntriesReply.class);
885
886             assertNotNull(appendEntriesReply);
887
888             // follower returns its next index
889             assertEquals(2, appendEntriesReply.getLogLastIndex());
890             assertEquals(1, appendEntriesReply.getLogLastTerm());
891
892         }};
893     }
894
895
896     @Test
897     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
898         new JavaTestKit(getSystem()) {{
899
900             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
901
902             MockRaftActorContext leaderActorContext =
903                 new MockRaftActorContext("leader", getSystem(), leaderActor);
904
905             ActorRef followerActor = getSystem().actorOf(
906                 Props.create(ForwardMessageToBehaviorActor.class));
907
908             MockRaftActorContext followerActorContext =
909                 new MockRaftActorContext("follower", getSystem(), followerActor);
910
911             Follower follower = new Follower(followerActorContext);
912
913             ForwardMessageToBehaviorActor.setBehavior(follower);
914
915             Map<String, String> peerAddresses = new HashMap<>();
916             peerAddresses.put(followerActor.path().toString(),
917                 followerActor.path().toString());
918
919             leaderActorContext.setPeerAddresses(peerAddresses);
920
921             leaderActorContext.getReplicatedLog().removeFrom(0);
922
923             leaderActorContext.setReplicatedLog(
924                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
925
926             leaderActorContext.setCommitIndex(1);
927
928             followerActorContext.getReplicatedLog().removeFrom(0);
929
930             followerActorContext.setReplicatedLog(
931                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
932
933             // follower has the same log entries but its commit index > leaders commit index
934             followerActorContext.setCommitIndex(2);
935
936             Leader leader = new Leader(leaderActorContext);
937             leader.markFollowerActive(followerActor.path().toString());
938
939             leader.handleMessage(leaderActor, new SendHeartBeat());
940
941             AppendEntriesMessages.AppendEntries appendEntries =
942                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
943                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
944
945             assertNotNull(appendEntries);
946
947             assertEquals(1, appendEntries.getLeaderCommit());
948             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
949             assertEquals(0, appendEntries.getPrevLogIndex());
950
951             AppendEntriesReply appendEntriesReply =
952                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
953                     leaderActor, AppendEntriesReply.class);
954
955             assertNotNull(appendEntriesReply);
956
957             assertEquals(2, appendEntriesReply.getLogLastIndex());
958             assertEquals(1, appendEntriesReply.getLogLastTerm());
959
960         }};
961     }
962
963     @Test
964     public void testHandleAppendEntriesReplyFailure(){
965         new JavaTestKit(getSystem()) {
966             {
967
968                 ActorRef leaderActor =
969                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
970
971                 ActorRef followerActor =
972                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
973
974
975                 MockRaftActorContext leaderActorContext =
976                     new MockRaftActorContext("leader", getSystem(), leaderActor);
977
978                 Map<String, String> peerAddresses = new HashMap<>();
979                 peerAddresses.put("follower-1",
980                     followerActor.path().toString());
981
982                 leaderActorContext.setPeerAddresses(peerAddresses);
983
984                 Leader leader = new Leader(leaderActorContext);
985
986                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
987
988                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
989
990                 assertEquals(RaftState.Leader, raftActorBehavior.state());
991
992             }};
993     }
994
995     @Test
996     public void testHandleAppendEntriesReplySuccess() throws Exception {
997         new JavaTestKit(getSystem()) {
998             {
999
1000                 ActorRef leaderActor =
1001                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1002
1003                 ActorRef followerActor =
1004                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1005
1006
1007                 MockRaftActorContext leaderActorContext =
1008                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1009
1010                 leaderActorContext.setReplicatedLog(
1011                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1012
1013                 Map<String, String> peerAddresses = new HashMap<>();
1014                 peerAddresses.put("follower-1",
1015                     followerActor.path().toString());
1016
1017                 leaderActorContext.setPeerAddresses(peerAddresses);
1018                 leaderActorContext.setCommitIndex(1);
1019                 leaderActorContext.setLastApplied(1);
1020                 leaderActorContext.getTermInformation().update(1, "leader");
1021
1022                 Leader leader = new Leader(leaderActorContext);
1023
1024                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1025
1026                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1027
1028                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1029
1030                 assertEquals(2, leaderActorContext.getCommitIndex());
1031
1032                 ApplyLogEntries applyLogEntries =
1033                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1034                         ApplyLogEntries.class);
1035
1036                 assertNotNull(applyLogEntries);
1037
1038                 assertEquals(2, leaderActorContext.getLastApplied());
1039
1040                 assertEquals(2, applyLogEntries.getToIndex());
1041
1042                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1043                     ApplyState.class);
1044
1045                 assertEquals(1,applyStateList.size());
1046
1047                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1048
1049                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1050
1051             }};
1052     }
1053
1054     @Test
1055     public void testHandleAppendEntriesReplyUnknownFollower(){
1056         new JavaTestKit(getSystem()) {
1057             {
1058
1059                 ActorRef leaderActor =
1060                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1061
1062                 MockRaftActorContext leaderActorContext =
1063                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1064
1065                 Leader leader = new Leader(leaderActorContext);
1066
1067                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1068
1069                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1070
1071                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1072
1073             }};
1074     }
1075
1076     @Test
1077     public void testHandleRequestVoteReply(){
1078         new JavaTestKit(getSystem()) {
1079             {
1080
1081                 ActorRef leaderActor =
1082                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1083
1084                 MockRaftActorContext leaderActorContext =
1085                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1086
1087                 Leader leader = new Leader(leaderActorContext);
1088
1089                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1090
1091                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1092
1093                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1094
1095                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1096             }};
1097     }
1098
1099     @Test
1100     public void testIsolatedLeaderCheckNoFollowers() {
1101         new JavaTestKit(getSystem()) {{
1102             ActorRef leaderActor = getTestActor();
1103
1104             MockRaftActorContext leaderActorContext =
1105                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1106
1107             Map<String, String> peerAddresses = new HashMap<>();
1108             leaderActorContext.setPeerAddresses(peerAddresses);
1109
1110             Leader leader = new Leader(leaderActorContext);
1111             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1112             Assert.assertTrue(behavior instanceof Leader);
1113         }};
1114     }
1115
1116     @Test
1117     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1118         new JavaTestKit(getSystem()) {{
1119
1120             ActorRef followerActor1 = getTestActor();
1121             ActorRef followerActor2 = getTestActor();
1122
1123             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1124
1125             Map<String, String> peerAddresses = new HashMap<>();
1126             peerAddresses.put("follower-1", followerActor1.path().toString());
1127             peerAddresses.put("follower-2", followerActor2.path().toString());
1128
1129             leaderActorContext.setPeerAddresses(peerAddresses);
1130
1131             Leader leader = new Leader(leaderActorContext);
1132             leader.stopIsolatedLeaderCheckSchedule();
1133
1134             leader.markFollowerActive("follower-1");
1135             leader.markFollowerActive("follower-2");
1136             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1137             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1138                 behavior instanceof Leader);
1139
1140             // kill 1 follower and verify if that got killed
1141             final JavaTestKit probe = new JavaTestKit(getSystem());
1142             probe.watch(followerActor1);
1143             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1144             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1145             assertEquals(termMsg1.getActor(), followerActor1);
1146
1147             leader.markFollowerInActive("follower-1");
1148             leader.markFollowerActive("follower-2");
1149             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1150             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1151                 behavior instanceof Leader);
1152
1153             // kill 2nd follower and leader should change to Isolated leader
1154             followerActor2.tell(PoisonPill.getInstance(), null);
1155             probe.watch(followerActor2);
1156             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1157             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1158             assertEquals(termMsg2.getActor(), followerActor2);
1159
1160             leader.markFollowerInActive("follower-2");
1161             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1162             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1163                 behavior instanceof IsolatedLeader);
1164
1165         }};
1166     }
1167
1168     class MockLeader extends Leader {
1169
1170         FollowerToSnapshot fts;
1171
1172         public MockLeader(RaftActorContext context){
1173             super(context);
1174         }
1175
1176         public FollowerToSnapshot getFollowerToSnapshot() {
1177             return fts;
1178         }
1179
1180         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1181             fts = new FollowerToSnapshot(bs);
1182             setFollowerSnapshot(followerId, fts);
1183         }
1184     }
1185
1186     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1187
1188         private long electionTimeOutIntervalMillis;
1189         private int snapshotChunkSize;
1190
1191         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1192             super();
1193             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1194             this.snapshotChunkSize = snapshotChunkSize;
1195         }
1196
1197         @Override
1198         public FiniteDuration getElectionTimeOutInterval() {
1199             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1200         }
1201
1202         @Override
1203         public int getSnapshotChunkSize() {
1204             return snapshotChunkSize;
1205         }
1206     }
1207 }