Merge "Add MD-SAL artifacts"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51
52     private ActorRef leaderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54     private ActorRef senderActor =
55         getSystem().actorOf(Props.create(DoNothingActor.class));
56
57     @Test
58     public void testHandleMessageForUnknownMessage() throws Exception {
59         new JavaTestKit(getSystem()) {{
60             Leader leader =
61                 new Leader(createActorContext());
62
63             // handle message should return the Leader state when it receives an
64             // unknown message
65             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66             Assert.assertTrue(behavior instanceof Leader);
67         }};
68     }
69
70
71     @Test
72     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
73         new JavaTestKit(getSystem()) {{
74
75             new Within(duration("1 seconds")) {
76                 protected void run() {
77
78                     ActorRef followerActor = getTestActor();
79
80                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81
82                     Map<String, String> peerAddresses = new HashMap<>();
83
84                     peerAddresses.put(followerActor.path().toString(),
85                         followerActor.path().toString());
86
87                     actorContext.setPeerAddresses(peerAddresses);
88
89                     Leader leader = new Leader(actorContext);
90                     leader.handleMessage(senderActor, new SendHeartBeat());
91
92                     final String out =
93                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94                             // do not put code outside this method, will run afterwards
95                             protected String match(Object in) {
96                                 Object msg = fromSerializableMessage(in);
97                                 if (msg instanceof AppendEntries) {
98                                     if (((AppendEntries)msg).getTerm() == 0) {
99                                         return "match";
100                                     }
101                                     return null;
102                                 } else {
103                                     throw noMatch();
104                                 }
105                             }
106                         }.get(); // this extracts the received message
107
108                     assertEquals("match", out);
109
110                 }
111             };
112         }};
113     }
114
115     @Test
116     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
117         new JavaTestKit(getSystem()) {{
118
119             new Within(duration("1 seconds")) {
120                 protected void run() {
121
122                     ActorRef followerActor = getTestActor();
123
124                     MockRaftActorContext actorContext =
125                         (MockRaftActorContext) createActorContext();
126
127                     Map<String, String> peerAddresses = new HashMap<>();
128
129                     peerAddresses.put(followerActor.path().toString(),
130                             followerActor.path().toString());
131
132                     actorContext.setPeerAddresses(peerAddresses);
133
134                     Leader leader = new Leader(actorContext);
135                     RaftActorBehavior raftBehavior = leader
136                         .handleMessage(senderActor, new Replicate(null, null,
137                             new MockRaftActorContext.MockReplicatedLogEntry(1,
138                                 100,
139                                 new MockRaftActorContext.MockPayload("foo"))
140                         ));
141
142                     // State should not change
143                     assertTrue(raftBehavior instanceof Leader);
144
145                     final String out =
146                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
147                             // do not put code outside this method, will run afterwards
148                             protected String match(Object in) {
149                                 Object msg = fromSerializableMessage(in);
150                                 if (msg instanceof AppendEntries) {
151                                     if (((AppendEntries)msg).getTerm() == 0) {
152                                         return "match";
153                                     }
154                                     return null;
155                                 } else {
156                                     throw noMatch();
157                                 }
158                             }
159                         }.get(); // this extracts the received message
160
161                     assertEquals("match", out);
162                 }
163             };
164         }};
165     }
166
167     @Test
168     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
169         new JavaTestKit(getSystem()) {{
170
171             new Within(duration("1 seconds")) {
172                 protected void run() {
173
174                     ActorRef raftActor = getTestActor();
175
176                     MockRaftActorContext actorContext =
177                         new MockRaftActorContext("test", getSystem(), raftActor);
178
179                     actorContext.getReplicatedLog().removeFrom(0);
180
181                     actorContext.setReplicatedLog(
182                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
183                             .build());
184
185                     Leader leader = new Leader(actorContext);
186                     RaftActorBehavior raftBehavior = leader
187                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
188
189                     // State should not change
190                     assertTrue(raftBehavior instanceof Leader);
191
192                     assertEquals(1, actorContext.getCommitIndex());
193
194                     final String out =
195                         new ExpectMsg<String>(duration("1 seconds"),
196                             "match hint") {
197                             // do not put code outside this method, will run afterwards
198                             protected String match(Object in) {
199                                 if (in instanceof ApplyState) {
200                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
201                                         return "match";
202                                     }
203                                     return null;
204                                 } else {
205                                     throw noMatch();
206                                 }
207                             }
208                         }.get(); // this extracts the received message
209
210                     assertEquals("match", out);
211
212                 }
213             };
214         }};
215     }
216
217     @Test
218     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
219         new JavaTestKit(getSystem()) {{
220             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
221
222             Map<String, String> peerAddresses = new HashMap<>();
223             peerAddresses.put(followerActor.path().toString(),
224                 followerActor.path().toString());
225
226             MockRaftActorContext actorContext =
227                 (MockRaftActorContext) createActorContext(leaderActor);
228             actorContext.setPeerAddresses(peerAddresses);
229
230             Map<String, String> leadersSnapshot = new HashMap<>();
231             leadersSnapshot.put("1", "A");
232             leadersSnapshot.put("2", "B");
233             leadersSnapshot.put("3", "C");
234
235             //clears leaders log
236             actorContext.getReplicatedLog().removeFrom(0);
237
238             final int followersLastIndex = 2;
239             final int snapshotIndex = 3;
240             final int newEntryIndex = 4;
241             final int snapshotTerm = 1;
242             final int currentTerm = 2;
243
244             // set the snapshot variables in replicatedlog
245             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
246             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
247             actorContext.setCommitIndex(followersLastIndex);
248             //set follower timeout to 2 mins, helps during debugging
249             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
250
251             MockLeader leader = new MockLeader(actorContext);
252
253             // new entry
254             ReplicatedLogImplEntry entry =
255                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
256                     new MockRaftActorContext.MockPayload("D"));
257
258             //update follower timestamp
259             leader.markFollowerActive(followerActor.path().toString());
260
261             ByteString bs = toByteString(leadersSnapshot);
262             leader.setSnapshot(Optional.of(bs));
263             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
264
265             //send first chunk and no InstallSnapshotReply received yet
266             leader.getFollowerToSnapshot().getNextChunk();
267             leader.getFollowerToSnapshot().incrementChunkIndex();
268
269             leader.handleMessage(leaderActor, new SendHeartBeat());
270
271             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
272                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
273
274             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
275                 "received", aeproto);
276
277             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
278
279             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
280
281             //InstallSnapshotReply received
282             leader.getFollowerToSnapshot().markSendStatus(true);
283
284             leader.handleMessage(senderActor, new SendHeartBeat());
285
286             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
287                 MessageCollectorActor.getFirstMatching(followerActor,
288                     InstallSnapshot.SERIALIZABLE_CLASS);
289
290             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
291                 isproto);
292
293             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
294
295             assertEquals(snapshotIndex, is.getLastIncludedIndex());
296
297         }};
298     }
299
300     @Test
301     public void testSendAppendEntriesSnapshotScenario() {
302         new JavaTestKit(getSystem()) {{
303
304             ActorRef followerActor = getTestActor();
305
306             Map<String, String> peerAddresses = new HashMap<>();
307             peerAddresses.put(followerActor.path().toString(),
308                 followerActor.path().toString());
309
310             MockRaftActorContext actorContext =
311                 (MockRaftActorContext) createActorContext(getRef());
312             actorContext.setPeerAddresses(peerAddresses);
313
314             Map<String, String> leadersSnapshot = new HashMap<>();
315             leadersSnapshot.put("1", "A");
316             leadersSnapshot.put("2", "B");
317             leadersSnapshot.put("3", "C");
318
319             //clears leaders log
320             actorContext.getReplicatedLog().removeFrom(0);
321
322             final int followersLastIndex = 2;
323             final int snapshotIndex = 3;
324             final int newEntryIndex = 4;
325             final int snapshotTerm = 1;
326             final int currentTerm = 2;
327
328             // set the snapshot variables in replicatedlog
329             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
330             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
331             actorContext.setCommitIndex(followersLastIndex);
332
333             Leader leader = new Leader(actorContext);
334
335             // new entry
336             ReplicatedLogImplEntry entry =
337                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
338                     new MockRaftActorContext.MockPayload("D"));
339
340             //update follower timestamp
341             leader.markFollowerActive(followerActor.path().toString());
342
343             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
344             RaftActorBehavior raftBehavior = leader.handleMessage(
345                 senderActor, new Replicate(null, "state-id", entry));
346
347             assertTrue(raftBehavior instanceof Leader);
348
349             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
350             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
351                 @Override
352                 protected Boolean match(Object o) throws Exception {
353                     if (o instanceof InitiateInstallSnapshot) {
354                         return true;
355                     }
356                     return false;
357                 }
358             }.get();
359
360             boolean initiateInitiateInstallSnapshot = false;
361             for (Boolean b: matches) {
362                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
363             }
364
365             assertTrue(initiateInitiateInstallSnapshot);
366         }};
367     }
368
369     @Test
370     public void testInitiateInstallSnapshot() throws Exception {
371         new JavaTestKit(getSystem()) {{
372
373             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
374
375             ActorRef followerActor = getTestActor();
376
377             Map<String, String> peerAddresses = new HashMap<>();
378             peerAddresses.put(followerActor.path().toString(),
379                 followerActor.path().toString());
380
381
382             MockRaftActorContext actorContext =
383                 (MockRaftActorContext) createActorContext(leaderActor);
384             actorContext.setPeerAddresses(peerAddresses);
385
386             Map<String, String> leadersSnapshot = new HashMap<>();
387             leadersSnapshot.put("1", "A");
388             leadersSnapshot.put("2", "B");
389             leadersSnapshot.put("3", "C");
390
391             //clears leaders log
392             actorContext.getReplicatedLog().removeFrom(0);
393
394             final int followersLastIndex = 2;
395             final int snapshotIndex = 3;
396             final int newEntryIndex = 4;
397             final int snapshotTerm = 1;
398             final int currentTerm = 2;
399
400             // set the snapshot variables in replicatedlog
401             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
402             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
403             actorContext.setLastApplied(3);
404             actorContext.setCommitIndex(followersLastIndex);
405
406             Leader leader = new Leader(actorContext);
407             // set the snapshot as absent and check if capture-snapshot is invoked.
408             leader.setSnapshot(Optional.<ByteString>absent());
409
410             // new entry
411             ReplicatedLogImplEntry entry =
412                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
413                     new MockRaftActorContext.MockPayload("D"));
414
415             actorContext.getReplicatedLog().append(entry);
416
417             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
418             RaftActorBehavior raftBehavior = leader.handleMessage(
419                 leaderActor, new InitiateInstallSnapshot());
420
421             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
422                 getFirstMatching(leaderActor, CaptureSnapshot.class);
423
424             assertNotNull(cs);
425
426             assertTrue(cs.isInstallSnapshotInitiated());
427             assertEquals(3, cs.getLastAppliedIndex());
428             assertEquals(1, cs.getLastAppliedTerm());
429             assertEquals(4, cs.getLastIndex());
430             assertEquals(2, cs.getLastTerm());
431         }};
432     }
433
434     @Test
435     public void testInstallSnapshot() {
436         new JavaTestKit(getSystem()) {{
437
438             ActorRef followerActor = getTestActor();
439
440             Map<String, String> peerAddresses = new HashMap<>();
441             peerAddresses.put(followerActor.path().toString(),
442                 followerActor.path().toString());
443
444             MockRaftActorContext actorContext =
445                 (MockRaftActorContext) createActorContext();
446             actorContext.setPeerAddresses(peerAddresses);
447
448
449             Map<String, String> leadersSnapshot = new HashMap<>();
450             leadersSnapshot.put("1", "A");
451             leadersSnapshot.put("2", "B");
452             leadersSnapshot.put("3", "C");
453
454             //clears leaders log
455             actorContext.getReplicatedLog().removeFrom(0);
456
457             final int followersLastIndex = 2;
458             final int snapshotIndex = 3;
459             final int newEntryIndex = 4;
460             final int snapshotTerm = 1;
461             final int currentTerm = 2;
462
463             // set the snapshot variables in replicatedlog
464             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
465             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
466             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
467             actorContext.setCommitIndex(followersLastIndex);
468
469             Leader leader = new Leader(actorContext);
470
471             // new entry
472             ReplicatedLogImplEntry entry =
473                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
474                     new MockRaftActorContext.MockPayload("D"));
475
476             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
477                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
478
479             assertTrue(raftBehavior instanceof Leader);
480
481             // check if installsnapshot gets called with the correct values.
482             final String out =
483                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
484                     // do not put code outside this method, will run afterwards
485                     protected String match(Object in) {
486                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
487                             InstallSnapshot is = (InstallSnapshot)
488                                 SerializationUtils.fromSerializable(in);
489                             if (is.getData() == null) {
490                                 return "InstallSnapshot data is null";
491                             }
492                             if (is.getLastIncludedIndex() != snapshotIndex) {
493                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
494                             }
495                             if (is.getLastIncludedTerm() != snapshotTerm) {
496                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
497                             }
498                             if (is.getTerm() == currentTerm) {
499                                 return is.getTerm() + "!=" + currentTerm;
500                             }
501
502                             return "match";
503
504                         } else {
505                             return "message mismatch:" + in.getClass();
506                         }
507                     }
508                 }.get(); // this extracts the received message
509
510             assertEquals("match", out);
511         }};
512     }
513
514     @Test
515     public void testHandleInstallSnapshotReplyLastChunk() {
516         new JavaTestKit(getSystem()) {{
517
518             ActorRef followerActor = getTestActor();
519
520             Map<String, String> peerAddresses = new HashMap<>();
521             peerAddresses.put(followerActor.path().toString(),
522                 followerActor.path().toString());
523
524             final int followersLastIndex = 2;
525             final int snapshotIndex = 3;
526             final int newEntryIndex = 4;
527             final int snapshotTerm = 1;
528             final int currentTerm = 2;
529
530             MockRaftActorContext actorContext =
531                 (MockRaftActorContext) createActorContext();
532             actorContext.setPeerAddresses(peerAddresses);
533             actorContext.setCommitIndex(followersLastIndex);
534
535             MockLeader leader = new MockLeader(actorContext);
536
537             Map<String, String> leadersSnapshot = new HashMap<>();
538             leadersSnapshot.put("1", "A");
539             leadersSnapshot.put("2", "B");
540             leadersSnapshot.put("3", "C");
541
542             // set the snapshot variables in replicatedlog
543
544             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
545             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
546             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
547
548             ByteString bs = toByteString(leadersSnapshot);
549             leader.setSnapshot(Optional.of(bs));
550             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
551             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
552                 leader.getFollowerToSnapshot().getNextChunk();
553                 leader.getFollowerToSnapshot().incrementChunkIndex();
554             }
555
556             //clears leaders log
557             actorContext.getReplicatedLog().removeFrom(0);
558
559             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
560                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
561                     leader.getFollowerToSnapshot().getChunkIndex(), true));
562
563             assertTrue(raftBehavior instanceof Leader);
564
565             assertEquals(leader.mapFollowerToSnapshot.size(), 0);
566             assertEquals(leader.followerToLog.size(), 1);
567             assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
568             FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
569             assertEquals(snapshotIndex, fli.getMatchIndex().get());
570             assertEquals(snapshotIndex, fli.getMatchIndex().get());
571             assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
572         }};
573     }
574
575     @Test
576     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
577         new JavaTestKit(getSystem()) {{
578
579             TestActorRef<MessageCollectorActor> followerActor =
580                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
581
582             Map<String, String> peerAddresses = new HashMap<>();
583             peerAddresses.put(followerActor.path().toString(),
584                     followerActor.path().toString());
585
586             final int followersLastIndex = 2;
587             final int snapshotIndex = 3;
588             final int snapshotTerm = 1;
589             final int currentTerm = 2;
590
591             MockRaftActorContext actorContext =
592                     (MockRaftActorContext) createActorContext();
593
594             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
595                 @Override
596                 public int getSnapshotChunkSize() {
597                     return 50;
598                 }
599             });
600             actorContext.setPeerAddresses(peerAddresses);
601             actorContext.setCommitIndex(followersLastIndex);
602
603             MockLeader leader = new MockLeader(actorContext);
604
605             Map<String, String> leadersSnapshot = new HashMap<>();
606             leadersSnapshot.put("1", "A");
607             leadersSnapshot.put("2", "B");
608             leadersSnapshot.put("3", "C");
609
610             // set the snapshot variables in replicatedlog
611             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
612             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
613             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
614
615             ByteString bs = toByteString(leadersSnapshot);
616             leader.setSnapshot(Optional.of(bs));
617
618             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
619
620             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
621
622             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
623
624             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
625
626             assertEquals(1, installSnapshot.getChunkIndex());
627             assertEquals(3, installSnapshot.getTotalChunks());
628
629
630             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
631
632             leader.handleMessage(leaderActor, new SendHeartBeat());
633
634             o = MessageCollectorActor.getAllMessages(followerActor).get(1);
635
636             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
637
638             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
639
640             assertEquals(1, installSnapshot.getChunkIndex());
641             assertEquals(3, installSnapshot.getTotalChunks());
642
643             followerActor.tell(PoisonPill.getInstance(), getRef());
644         }};
645     }
646
647     @Test
648     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
649         new JavaTestKit(getSystem()) {
650             {
651
652                 TestActorRef<MessageCollectorActor> followerActor =
653                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
654
655                 Map<String, String> peerAddresses = new HashMap<>();
656                 peerAddresses.put(followerActor.path().toString(),
657                         followerActor.path().toString());
658
659                 final int followersLastIndex = 2;
660                 final int snapshotIndex = 3;
661                 final int snapshotTerm = 1;
662                 final int currentTerm = 2;
663
664                 MockRaftActorContext actorContext =
665                         (MockRaftActorContext) createActorContext();
666
667                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
668                     @Override
669                     public int getSnapshotChunkSize() {
670                         return 50;
671                     }
672                 });
673                 actorContext.setPeerAddresses(peerAddresses);
674                 actorContext.setCommitIndex(followersLastIndex);
675
676                 MockLeader leader = new MockLeader(actorContext);
677
678                 Map<String, String> leadersSnapshot = new HashMap<>();
679                 leadersSnapshot.put("1", "A");
680                 leadersSnapshot.put("2", "B");
681                 leadersSnapshot.put("3", "C");
682
683                 // set the snapshot variables in replicatedlog
684                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
685                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
686                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
687
688                 ByteString bs = toByteString(leadersSnapshot);
689                 leader.setSnapshot(Optional.of(bs));
690
691                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
692
693                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
694
695                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
696
697                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
698
699                 assertEquals(1, installSnapshot.getChunkIndex());
700                 assertEquals(3, installSnapshot.getTotalChunks());
701                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
702
703                 int hashCode = installSnapshot.getData().hashCode();
704
705                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
706
707                 leader.handleMessage(leaderActor, new SendHeartBeat());
708
709                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
710
711                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
712
713                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
714
715                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
716
717                 assertEquals(2, installSnapshot.getChunkIndex());
718                 assertEquals(3, installSnapshot.getTotalChunks());
719                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
720
721                 followerActor.tell(PoisonPill.getInstance(), getRef());
722             }};
723     }
724
725     @Test
726     public void testFollowerToSnapshotLogic() {
727
728         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
729
730         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
731             @Override
732             public int getSnapshotChunkSize() {
733                 return 50;
734             }
735         });
736
737         MockLeader leader = new MockLeader(actorContext);
738
739         Map<String, String> leadersSnapshot = new HashMap<>();
740         leadersSnapshot.put("1", "A");
741         leadersSnapshot.put("2", "B");
742         leadersSnapshot.put("3", "C");
743
744         ByteString bs = toByteString(leadersSnapshot);
745         byte[] barray = bs.toByteArray();
746
747         leader.createFollowerToSnapshot("followerId", bs);
748         assertEquals(bs.size(), barray.length);
749
750         int chunkIndex=0;
751         for (int i=0; i < barray.length; i = i + 50) {
752             int j = i + 50;
753             chunkIndex++;
754
755             if (i + 50 > barray.length) {
756                 j = barray.length;
757             }
758
759             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
760             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
761             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
762
763             leader.getFollowerToSnapshot().markSendStatus(true);
764             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
765                 leader.getFollowerToSnapshot().incrementChunkIndex();
766             }
767         }
768
769         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
770     }
771
772
773     @Override protected RaftActorBehavior createBehavior(
774         RaftActorContext actorContext) {
775         return new Leader(actorContext);
776     }
777
778     @Override protected RaftActorContext createActorContext() {
779         return createActorContext(leaderActor);
780     }
781
782     protected RaftActorContext createActorContext(ActorRef actorRef) {
783         return new MockRaftActorContext("test", getSystem(), actorRef);
784     }
785
786     private ByteString toByteString(Map<String, String> state) {
787         ByteArrayOutputStream b = null;
788         ObjectOutputStream o = null;
789         try {
790             try {
791                 b = new ByteArrayOutputStream();
792                 o = new ObjectOutputStream(b);
793                 o.writeObject(state);
794                 byte[] snapshotBytes = b.toByteArray();
795                 return ByteString.copyFrom(snapshotBytes);
796             } finally {
797                 if (o != null) {
798                     o.flush();
799                     o.close();
800                 }
801                 if (b != null) {
802                     b.close();
803                 }
804             }
805         } catch (IOException e) {
806             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
807         }
808         return null;
809     }
810
811     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
812         private static AbstractRaftActorBehavior behavior;
813
814         public ForwardMessageToBehaviorActor(){
815
816         }
817
818         @Override public void onReceive(Object message) throws Exception {
819             super.onReceive(message);
820             behavior.handleMessage(sender(), message);
821         }
822
823         public static void setBehavior(AbstractRaftActorBehavior behavior){
824             ForwardMessageToBehaviorActor.behavior = behavior;
825         }
826     }
827
828     @Test
829     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
830         new JavaTestKit(getSystem()) {{
831
832             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
833
834             MockRaftActorContext leaderActorContext =
835                 new MockRaftActorContext("leader", getSystem(), leaderActor);
836
837             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
838
839             MockRaftActorContext followerActorContext =
840                 new MockRaftActorContext("follower", getSystem(), followerActor);
841
842             Follower follower = new Follower(followerActorContext);
843
844             ForwardMessageToBehaviorActor.setBehavior(follower);
845
846             Map<String, String> peerAddresses = new HashMap<>();
847             peerAddresses.put(followerActor.path().toString(),
848                 followerActor.path().toString());
849
850             leaderActorContext.setPeerAddresses(peerAddresses);
851
852             leaderActorContext.getReplicatedLog().removeFrom(0);
853
854             //create 3 entries
855             leaderActorContext.setReplicatedLog(
856                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
857
858             leaderActorContext.setCommitIndex(1);
859
860             followerActorContext.getReplicatedLog().removeFrom(0);
861
862             // follower too has the exact same log entries and has the same commit index
863             followerActorContext.setReplicatedLog(
864                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
865
866             followerActorContext.setCommitIndex(1);
867
868             Leader leader = new Leader(leaderActorContext);
869             leader.markFollowerActive(followerActor.path().toString());
870
871             leader.handleMessage(leaderActor, new SendHeartBeat());
872
873             AppendEntriesMessages.AppendEntries appendEntries =
874                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
875                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
876
877             assertNotNull(appendEntries);
878
879             assertEquals(1, appendEntries.getLeaderCommit());
880             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
881             assertEquals(0, appendEntries.getPrevLogIndex());
882
883             AppendEntriesReply appendEntriesReply =
884                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
885                     leaderActor, AppendEntriesReply.class);
886
887             assertNotNull(appendEntriesReply);
888
889             // follower returns its next index
890             assertEquals(2, appendEntriesReply.getLogLastIndex());
891             assertEquals(1, appendEntriesReply.getLogLastTerm());
892
893         }};
894     }
895
896
897     @Test
898     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
899         new JavaTestKit(getSystem()) {{
900
901             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
902
903             MockRaftActorContext leaderActorContext =
904                 new MockRaftActorContext("leader", getSystem(), leaderActor);
905
906             ActorRef followerActor = getSystem().actorOf(
907                 Props.create(ForwardMessageToBehaviorActor.class));
908
909             MockRaftActorContext followerActorContext =
910                 new MockRaftActorContext("follower", getSystem(), followerActor);
911
912             Follower follower = new Follower(followerActorContext);
913
914             ForwardMessageToBehaviorActor.setBehavior(follower);
915
916             Map<String, String> peerAddresses = new HashMap<>();
917             peerAddresses.put(followerActor.path().toString(),
918                 followerActor.path().toString());
919
920             leaderActorContext.setPeerAddresses(peerAddresses);
921
922             leaderActorContext.getReplicatedLog().removeFrom(0);
923
924             leaderActorContext.setReplicatedLog(
925                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
926
927             leaderActorContext.setCommitIndex(1);
928
929             followerActorContext.getReplicatedLog().removeFrom(0);
930
931             followerActorContext.setReplicatedLog(
932                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
933
934             // follower has the same log entries but its commit index > leaders commit index
935             followerActorContext.setCommitIndex(2);
936
937             Leader leader = new Leader(leaderActorContext);
938             leader.markFollowerActive(followerActor.path().toString());
939
940             leader.handleMessage(leaderActor, new SendHeartBeat());
941
942             AppendEntriesMessages.AppendEntries appendEntries =
943                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
944                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
945
946             assertNotNull(appendEntries);
947
948             assertEquals(1, appendEntries.getLeaderCommit());
949             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
950             assertEquals(0, appendEntries.getPrevLogIndex());
951
952             AppendEntriesReply appendEntriesReply =
953                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
954                     leaderActor, AppendEntriesReply.class);
955
956             assertNotNull(appendEntriesReply);
957
958             assertEquals(2, appendEntriesReply.getLogLastIndex());
959             assertEquals(1, appendEntriesReply.getLogLastTerm());
960
961         }};
962     }
963
964     @Test
965     public void testHandleAppendEntriesReplyFailure(){
966         new JavaTestKit(getSystem()) {
967             {
968
969                 ActorRef leaderActor =
970                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
971
972                 ActorRef followerActor =
973                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
974
975
976                 MockRaftActorContext leaderActorContext =
977                     new MockRaftActorContext("leader", getSystem(), leaderActor);
978
979                 Map<String, String> peerAddresses = new HashMap<>();
980                 peerAddresses.put("follower-1",
981                     followerActor.path().toString());
982
983                 leaderActorContext.setPeerAddresses(peerAddresses);
984
985                 Leader leader = new Leader(leaderActorContext);
986
987                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
988
989                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
990
991                 assertEquals(RaftState.Leader, raftActorBehavior.state());
992
993             }};
994     }
995
996     @Test
997     public void testHandleAppendEntriesReplySuccess() throws Exception {
998         new JavaTestKit(getSystem()) {
999             {
1000
1001                 ActorRef leaderActor =
1002                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1003
1004                 ActorRef followerActor =
1005                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1006
1007
1008                 MockRaftActorContext leaderActorContext =
1009                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1010
1011                 leaderActorContext.setReplicatedLog(
1012                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1013
1014                 Map<String, String> peerAddresses = new HashMap<>();
1015                 peerAddresses.put("follower-1",
1016                     followerActor.path().toString());
1017
1018                 leaderActorContext.setPeerAddresses(peerAddresses);
1019                 leaderActorContext.setCommitIndex(1);
1020                 leaderActorContext.setLastApplied(1);
1021                 leaderActorContext.getTermInformation().update(1, "leader");
1022
1023                 Leader leader = new Leader(leaderActorContext);
1024
1025                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1026
1027                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1028
1029                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1030
1031                 assertEquals(2, leaderActorContext.getCommitIndex());
1032
1033                 ApplyLogEntries applyLogEntries =
1034                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1035                         ApplyLogEntries.class);
1036
1037                 assertNotNull(applyLogEntries);
1038
1039                 assertEquals(2, leaderActorContext.getLastApplied());
1040
1041                 assertEquals(2, applyLogEntries.getToIndex());
1042
1043                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1044                     ApplyState.class);
1045
1046                 assertEquals(1,applyStateList.size());
1047
1048                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1049
1050                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1051
1052             }};
1053     }
1054
1055     @Test
1056     public void testHandleAppendEntriesReplyUnknownFollower(){
1057         new JavaTestKit(getSystem()) {
1058             {
1059
1060                 ActorRef leaderActor =
1061                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1062
1063                 MockRaftActorContext leaderActorContext =
1064                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1065
1066                 Leader leader = new Leader(leaderActorContext);
1067
1068                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1069
1070                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1071
1072                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1073
1074             }};
1075     }
1076
1077     @Test
1078     public void testHandleRequestVoteReply(){
1079         new JavaTestKit(getSystem()) {
1080             {
1081
1082                 ActorRef leaderActor =
1083                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1084
1085                 MockRaftActorContext leaderActorContext =
1086                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1087
1088                 Leader leader = new Leader(leaderActorContext);
1089
1090                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1091
1092                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1093
1094                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1095
1096                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1097             }};
1098     }
1099
1100     @Test
1101     public void testIsolatedLeaderCheckNoFollowers() {
1102         new JavaTestKit(getSystem()) {{
1103             ActorRef leaderActor = getTestActor();
1104
1105             MockRaftActorContext leaderActorContext =
1106                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1107
1108             Map<String, String> peerAddresses = new HashMap<>();
1109             leaderActorContext.setPeerAddresses(peerAddresses);
1110
1111             Leader leader = new Leader(leaderActorContext);
1112             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1113             Assert.assertTrue(behavior instanceof Leader);
1114         }};
1115     }
1116
1117     @Test
1118     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1119         new JavaTestKit(getSystem()) {{
1120
1121             ActorRef followerActor1 = getTestActor();
1122             ActorRef followerActor2 = getTestActor();
1123
1124             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1125
1126             Map<String, String> peerAddresses = new HashMap<>();
1127             peerAddresses.put("follower-1", followerActor1.path().toString());
1128             peerAddresses.put("follower-2", followerActor2.path().toString());
1129
1130             leaderActorContext.setPeerAddresses(peerAddresses);
1131
1132             Leader leader = new Leader(leaderActorContext);
1133             leader.stopIsolatedLeaderCheckSchedule();
1134
1135             leader.markFollowerActive("follower-1");
1136             leader.markFollowerActive("follower-2");
1137             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1138             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1139                 behavior instanceof Leader);
1140
1141             // kill 1 follower and verify if that got killed
1142             final JavaTestKit probe = new JavaTestKit(getSystem());
1143             probe.watch(followerActor1);
1144             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1145             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1146             assertEquals(termMsg1.getActor(), followerActor1);
1147
1148             leader.markFollowerInActive("follower-1");
1149             leader.markFollowerActive("follower-2");
1150             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1151             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1152                 behavior instanceof Leader);
1153
1154             // kill 2nd follower and leader should change to Isolated leader
1155             followerActor2.tell(PoisonPill.getInstance(), null);
1156             probe.watch(followerActor2);
1157             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1158             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1159             assertEquals(termMsg2.getActor(), followerActor2);
1160
1161             leader.markFollowerInActive("follower-2");
1162             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1163             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1164                 behavior instanceof IsolatedLeader);
1165
1166         }};
1167     }
1168
1169     class MockLeader extends Leader {
1170
1171         FollowerToSnapshot fts;
1172
1173         public MockLeader(RaftActorContext context){
1174             super(context);
1175         }
1176
1177         public FollowerToSnapshot getFollowerToSnapshot() {
1178             return fts;
1179         }
1180
1181         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1182             fts = new FollowerToSnapshot(bs);
1183             mapFollowerToSnapshot.put(followerId, fts);
1184
1185         }
1186     }
1187
1188     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1189
1190         private long electionTimeOutIntervalMillis;
1191         private int snapshotChunkSize;
1192
1193         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1194             super();
1195             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1196             this.snapshotChunkSize = snapshotChunkSize;
1197         }
1198
1199         @Override
1200         public FiniteDuration getElectionTimeOutInterval() {
1201             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1202         }
1203
1204         @Override
1205         public int getSnapshotChunkSize() {
1206             return snapshotChunkSize;
1207         }
1208     }
1209 }