Merge "BUG-2218: Keep existing link augmentations during discovery process"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.Uninterruptibles;
10 import com.google.protobuf.ByteString;
11 import java.io.ByteArrayOutputStream;
12 import java.io.IOException;
13 import java.io.ObjectOutputStream;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.Assert;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
21 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
22 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftState;
25 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
26 import org.opendaylight.controller.cluster.raft.SerializationUtils;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
32 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
33 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
37 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
41 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
42 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
45 import static org.junit.Assert.assertEquals;
46 import static org.junit.Assert.assertNotNull;
47 import static org.junit.Assert.assertTrue;
48
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50
51     private ActorRef leaderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53     private ActorRef senderActor =
54         getSystem().actorOf(Props.create(DoNothingActor.class));
55
56     @Test
57     public void testHandleMessageForUnknownMessage() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             Leader leader =
60                 new Leader(createActorContext());
61
62             // handle message should return the Leader state when it receives an
63             // unknown message
64             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65             Assert.assertTrue(behavior instanceof Leader);
66         }};
67     }
68
69
70     @Test
71     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72         new JavaTestKit(getSystem()) {{
73
74             new Within(duration("1 seconds")) {
75                 protected void run() {
76
77                     ActorRef followerActor = getTestActor();
78
79                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80
81                     Map<String, String> peerAddresses = new HashMap<>();
82
83                     peerAddresses.put(followerActor.path().toString(),
84                         followerActor.path().toString());
85
86                     actorContext.setPeerAddresses(peerAddresses);
87
88                     Leader leader = new Leader(actorContext);
89                     leader.handleMessage(senderActor, new SendHeartBeat());
90
91                     final String out =
92                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93                             // do not put code outside this method, will run afterwards
94                             protected String match(Object in) {
95                                 Object msg = fromSerializableMessage(in);
96                                 if (msg instanceof AppendEntries) {
97                                     if (((AppendEntries)msg).getTerm() == 0) {
98                                         return "match";
99                                     }
100                                     return null;
101                                 } else {
102                                     throw noMatch();
103                                 }
104                             }
105                         }.get(); // this extracts the received message
106
107                     assertEquals("match", out);
108
109                 }
110             };
111         }};
112     }
113
114     @Test
115     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
116         new JavaTestKit(getSystem()) {{
117
118             new Within(duration("1 seconds")) {
119                 protected void run() {
120
121                     ActorRef followerActor = getTestActor();
122
123                     MockRaftActorContext actorContext =
124                         (MockRaftActorContext) createActorContext();
125
126                     Map<String, String> peerAddresses = new HashMap<>();
127
128                     peerAddresses.put(followerActor.path().toString(),
129                         followerActor.path().toString());
130
131                     actorContext.setPeerAddresses(peerAddresses);
132
133                     Leader leader = new Leader(actorContext);
134                     RaftActorBehavior raftBehavior = leader
135                         .handleMessage(senderActor, new Replicate(null, null,
136                             new MockRaftActorContext.MockReplicatedLogEntry(1,
137                                 100,
138                                 new MockRaftActorContext.MockPayload("foo"))
139                         ));
140
141                     // State should not change
142                     assertTrue(raftBehavior instanceof Leader);
143
144                     final String out =
145                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
146                             // do not put code outside this method, will run afterwards
147                             protected String match(Object in) {
148                                 Object msg = fromSerializableMessage(in);
149                                 if (msg instanceof AppendEntries) {
150                                     if (((AppendEntries)msg).getTerm() == 0) {
151                                         return "match";
152                                     }
153                                     return null;
154                                 } else {
155                                     throw noMatch();
156                                 }
157                             }
158                         }.get(); // this extracts the received message
159
160                     assertEquals("match", out);
161                 }
162             };
163         }};
164     }
165
166     @Test
167     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168         new JavaTestKit(getSystem()) {{
169
170             new Within(duration("1 seconds")) {
171                 protected void run() {
172
173                     ActorRef raftActor = getTestActor();
174
175                     MockRaftActorContext actorContext =
176                         new MockRaftActorContext("test", getSystem(), raftActor);
177
178                     actorContext.getReplicatedLog().removeFrom(0);
179
180                     actorContext.setReplicatedLog(
181                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
182                             .build());
183
184                     Leader leader = new Leader(actorContext);
185                     RaftActorBehavior raftBehavior = leader
186                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
187
188                     // State should not change
189                     assertTrue(raftBehavior instanceof Leader);
190
191                     assertEquals(1, actorContext.getCommitIndex());
192
193                     final String out =
194                         new ExpectMsg<String>(duration("1 seconds"),
195                             "match hint") {
196                             // do not put code outside this method, will run afterwards
197                             protected String match(Object in) {
198                                 if (in instanceof ApplyState) {
199                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
200                                         return "match";
201                                     }
202                                     return null;
203                                 } else {
204                                     throw noMatch();
205                                 }
206                             }
207                         }.get(); // this extracts the received message
208
209                     assertEquals("match", out);
210
211                 }
212             };
213         }};
214     }
215
216     @Test
217     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
218         new JavaTestKit(getSystem()) {{
219             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
220
221             Map<String, String> peerAddresses = new HashMap<>();
222             peerAddresses.put(followerActor.path().toString(),
223                 followerActor.path().toString());
224
225             MockRaftActorContext actorContext =
226                 (MockRaftActorContext) createActorContext(leaderActor);
227             actorContext.setPeerAddresses(peerAddresses);
228
229             Map<String, String> leadersSnapshot = new HashMap<>();
230             leadersSnapshot.put("1", "A");
231             leadersSnapshot.put("2", "B");
232             leadersSnapshot.put("3", "C");
233
234             //clears leaders log
235             actorContext.getReplicatedLog().removeFrom(0);
236
237             final int followersLastIndex = 2;
238             final int snapshotIndex = 3;
239             final int newEntryIndex = 4;
240             final int snapshotTerm = 1;
241             final int currentTerm = 2;
242
243             // set the snapshot variables in replicatedlog
244             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
245             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
246             actorContext.setCommitIndex(followersLastIndex);
247             //set follower timeout to 2 mins, helps during debugging
248             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
249
250             MockLeader leader = new MockLeader(actorContext);
251
252             // new entry
253             ReplicatedLogImplEntry entry =
254                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
255                     new MockRaftActorContext.MockPayload("D"));
256
257             //update follower timestamp
258             leader.markFollowerActive(followerActor.path().toString());
259
260             ByteString bs = toByteString(leadersSnapshot);
261             leader.setSnapshot(Optional.of(bs));
262             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
263
264             //send first chunk and no InstallSnapshotReply received yet
265             leader.getFollowerToSnapshot().getNextChunk();
266             leader.getFollowerToSnapshot().incrementChunkIndex();
267
268             leader.handleMessage(leaderActor, new SendHeartBeat());
269
270             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
271                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
272
273             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
274                 "received", aeproto);
275
276             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
277
278             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
279
280             //InstallSnapshotReply received
281             leader.getFollowerToSnapshot().markSendStatus(true);
282
283             leader.handleMessage(senderActor, new SendHeartBeat());
284
285             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
286                 MessageCollectorActor.getFirstMatching(followerActor,
287                     InstallSnapshot.SERIALIZABLE_CLASS);
288
289             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
290                 isproto);
291
292             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
293
294             assertEquals(snapshotIndex, is.getLastIncludedIndex());
295
296         }};
297     }
298
299     @Test
300     public void testSendAppendEntriesSnapshotScenario() {
301         new JavaTestKit(getSystem()) {{
302
303             ActorRef followerActor = getTestActor();
304
305             Map<String, String> peerAddresses = new HashMap<>();
306             peerAddresses.put(followerActor.path().toString(),
307                 followerActor.path().toString());
308
309             MockRaftActorContext actorContext =
310                 (MockRaftActorContext) createActorContext(getRef());
311             actorContext.setPeerAddresses(peerAddresses);
312
313             Map<String, String> leadersSnapshot = new HashMap<>();
314             leadersSnapshot.put("1", "A");
315             leadersSnapshot.put("2", "B");
316             leadersSnapshot.put("3", "C");
317
318             //clears leaders log
319             actorContext.getReplicatedLog().removeFrom(0);
320
321             final int followersLastIndex = 2;
322             final int snapshotIndex = 3;
323             final int newEntryIndex = 4;
324             final int snapshotTerm = 1;
325             final int currentTerm = 2;
326
327             // set the snapshot variables in replicatedlog
328             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
329             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
330             actorContext.setCommitIndex(followersLastIndex);
331
332             Leader leader = new Leader(actorContext);
333
334             // new entry
335             ReplicatedLogImplEntry entry =
336                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337                     new MockRaftActorContext.MockPayload("D"));
338
339             //update follower timestamp
340             leader.markFollowerActive(followerActor.path().toString());
341
342             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
343             RaftActorBehavior raftBehavior = leader.handleMessage(
344                 senderActor, new Replicate(null, "state-id", entry));
345
346             assertTrue(raftBehavior instanceof Leader);
347
348             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
349             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
350                 @Override
351                 protected Boolean match(Object o) throws Exception {
352                     if (o instanceof InitiateInstallSnapshot) {
353                         return true;
354                     }
355                     return false;
356                 }
357             }.get();
358
359             boolean initiateInitiateInstallSnapshot = false;
360             for (Boolean b: matches) {
361                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
362             }
363
364             assertTrue(initiateInitiateInstallSnapshot);
365         }};
366     }
367
368     @Test
369     public void testInitiateInstallSnapshot() throws Exception {
370         new JavaTestKit(getSystem()) {{
371
372             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
373
374             ActorRef followerActor = getTestActor();
375
376             Map<String, String> peerAddresses = new HashMap<>();
377             peerAddresses.put(followerActor.path().toString(),
378                 followerActor.path().toString());
379
380
381             MockRaftActorContext actorContext =
382                 (MockRaftActorContext) createActorContext(leaderActor);
383             actorContext.setPeerAddresses(peerAddresses);
384
385             Map<String, String> leadersSnapshot = new HashMap<>();
386             leadersSnapshot.put("1", "A");
387             leadersSnapshot.put("2", "B");
388             leadersSnapshot.put("3", "C");
389
390             //clears leaders log
391             actorContext.getReplicatedLog().removeFrom(0);
392
393             final int followersLastIndex = 2;
394             final int snapshotIndex = 3;
395             final int newEntryIndex = 4;
396             final int snapshotTerm = 1;
397             final int currentTerm = 2;
398
399             // set the snapshot variables in replicatedlog
400             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
401             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
402             actorContext.setLastApplied(3);
403             actorContext.setCommitIndex(followersLastIndex);
404
405             Leader leader = new Leader(actorContext);
406             // set the snapshot as absent and check if capture-snapshot is invoked.
407             leader.setSnapshot(Optional.<ByteString>absent());
408
409             // new entry
410             ReplicatedLogImplEntry entry =
411                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
412                     new MockRaftActorContext.MockPayload("D"));
413
414             actorContext.getReplicatedLog().append(entry);
415
416             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
417             RaftActorBehavior raftBehavior = leader.handleMessage(
418                 leaderActor, new InitiateInstallSnapshot());
419
420             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
421                 getFirstMatching(leaderActor, CaptureSnapshot.class);
422
423             assertNotNull(cs);
424
425             assertTrue(cs.isInstallSnapshotInitiated());
426             assertEquals(3, cs.getLastAppliedIndex());
427             assertEquals(1, cs.getLastAppliedTerm());
428             assertEquals(4, cs.getLastIndex());
429             assertEquals(2, cs.getLastTerm());
430         }};
431     }
432
433     @Test
434     public void testInstallSnapshot() {
435         new JavaTestKit(getSystem()) {{
436
437             ActorRef followerActor = getTestActor();
438
439             Map<String, String> peerAddresses = new HashMap<>();
440             peerAddresses.put(followerActor.path().toString(),
441                 followerActor.path().toString());
442
443             MockRaftActorContext actorContext =
444                 (MockRaftActorContext) createActorContext();
445             actorContext.setPeerAddresses(peerAddresses);
446
447
448             Map<String, String> leadersSnapshot = new HashMap<>();
449             leadersSnapshot.put("1", "A");
450             leadersSnapshot.put("2", "B");
451             leadersSnapshot.put("3", "C");
452
453             //clears leaders log
454             actorContext.getReplicatedLog().removeFrom(0);
455
456             final int followersLastIndex = 2;
457             final int snapshotIndex = 3;
458             final int newEntryIndex = 4;
459             final int snapshotTerm = 1;
460             final int currentTerm = 2;
461
462             // set the snapshot variables in replicatedlog
463             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
464             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
465             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
466             actorContext.setCommitIndex(followersLastIndex);
467
468             Leader leader = new Leader(actorContext);
469
470             // new entry
471             ReplicatedLogImplEntry entry =
472                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
473                     new MockRaftActorContext.MockPayload("D"));
474
475             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
476                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
477
478             assertTrue(raftBehavior instanceof Leader);
479
480             // check if installsnapshot gets called with the correct values.
481             final String out =
482                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
483                     // do not put code outside this method, will run afterwards
484                     protected String match(Object in) {
485                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
486                             InstallSnapshot is = (InstallSnapshot)
487                                 SerializationUtils.fromSerializable(in);
488                             if (is.getData() == null) {
489                                 return "InstallSnapshot data is null";
490                             }
491                             if (is.getLastIncludedIndex() != snapshotIndex) {
492                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
493                             }
494                             if (is.getLastIncludedTerm() != snapshotTerm) {
495                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
496                             }
497                             if (is.getTerm() == currentTerm) {
498                                 return is.getTerm() + "!=" + currentTerm;
499                             }
500
501                             return "match";
502
503                         } else {
504                             return "message mismatch:" + in.getClass();
505                         }
506                     }
507                 }.get(); // this extracts the received message
508
509             assertEquals("match", out);
510         }};
511     }
512
513     @Test
514     public void testHandleInstallSnapshotReplyLastChunk() {
515         new JavaTestKit(getSystem()) {{
516
517             ActorRef followerActor = getTestActor();
518
519             Map<String, String> peerAddresses = new HashMap<>();
520             peerAddresses.put(followerActor.path().toString(),
521                 followerActor.path().toString());
522
523             final int followersLastIndex = 2;
524             final int snapshotIndex = 3;
525             final int newEntryIndex = 4;
526             final int snapshotTerm = 1;
527             final int currentTerm = 2;
528
529             MockRaftActorContext actorContext =
530                 (MockRaftActorContext) createActorContext();
531             actorContext.setPeerAddresses(peerAddresses);
532             actorContext.setCommitIndex(followersLastIndex);
533
534             MockLeader leader = new MockLeader(actorContext);
535
536             Map<String, String> leadersSnapshot = new HashMap<>();
537             leadersSnapshot.put("1", "A");
538             leadersSnapshot.put("2", "B");
539             leadersSnapshot.put("3", "C");
540
541             // set the snapshot variables in replicatedlog
542
543             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
544             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
545             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
546
547             ByteString bs = toByteString(leadersSnapshot);
548             leader.setSnapshot(Optional.of(bs));
549             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
550             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
551                 leader.getFollowerToSnapshot().getNextChunk();
552                 leader.getFollowerToSnapshot().incrementChunkIndex();
553             }
554
555             //clears leaders log
556             actorContext.getReplicatedLog().removeFrom(0);
557
558             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
559                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
560                     leader.getFollowerToSnapshot().getChunkIndex(), true));
561
562             assertTrue(raftBehavior instanceof Leader);
563
564             assertEquals(leader.mapFollowerToSnapshot.size(), 0);
565             assertEquals(leader.followerToLog.size(), 1);
566             assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
567             FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
568             assertEquals(snapshotIndex, fli.getMatchIndex().get());
569             assertEquals(snapshotIndex, fli.getMatchIndex().get());
570             assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
571         }};
572     }
573
574     @Test
575     public void testFollowerToSnapshotLogic() {
576
577         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
578
579         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
580             @Override
581             public int getSnapshotChunkSize() {
582                 return 50;
583             }
584         });
585
586         MockLeader leader = new MockLeader(actorContext);
587
588         Map<String, String> leadersSnapshot = new HashMap<>();
589         leadersSnapshot.put("1", "A");
590         leadersSnapshot.put("2", "B");
591         leadersSnapshot.put("3", "C");
592
593         ByteString bs = toByteString(leadersSnapshot);
594         byte[] barray = bs.toByteArray();
595
596         leader.createFollowerToSnapshot("followerId", bs);
597         assertEquals(bs.size(), barray.length);
598
599         int chunkIndex=0;
600         for (int i=0; i < barray.length; i = i + 50) {
601             int j = i + 50;
602             chunkIndex++;
603
604             if (i + 50 > barray.length) {
605                 j = barray.length;
606             }
607
608             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
609             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
610             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
611
612             leader.getFollowerToSnapshot().markSendStatus(true);
613             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
614                 leader.getFollowerToSnapshot().incrementChunkIndex();
615             }
616         }
617
618         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
619     }
620
621
622     @Override protected RaftActorBehavior createBehavior(
623         RaftActorContext actorContext) {
624         return new Leader(actorContext);
625     }
626
627     @Override protected RaftActorContext createActorContext() {
628         return createActorContext(leaderActor);
629     }
630
631     protected RaftActorContext createActorContext(ActorRef actorRef) {
632         return new MockRaftActorContext("test", getSystem(), actorRef);
633     }
634
635     private ByteString toByteString(Map<String, String> state) {
636         ByteArrayOutputStream b = null;
637         ObjectOutputStream o = null;
638         try {
639             try {
640                 b = new ByteArrayOutputStream();
641                 o = new ObjectOutputStream(b);
642                 o.writeObject(state);
643                 byte[] snapshotBytes = b.toByteArray();
644                 return ByteString.copyFrom(snapshotBytes);
645             } finally {
646                 if (o != null) {
647                     o.flush();
648                     o.close();
649                 }
650                 if (b != null) {
651                     b.close();
652                 }
653             }
654         } catch (IOException e) {
655             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
656         }
657         return null;
658     }
659
660     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
661         private static AbstractRaftActorBehavior behavior;
662
663         public ForwardMessageToBehaviorActor(){
664
665         }
666
667         @Override public void onReceive(Object message) throws Exception {
668             super.onReceive(message);
669             behavior.handleMessage(sender(), message);
670         }
671
672         public static void setBehavior(AbstractRaftActorBehavior behavior){
673             ForwardMessageToBehaviorActor.behavior = behavior;
674         }
675     }
676
677     @Test
678     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
679         new JavaTestKit(getSystem()) {{
680
681             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
682
683             MockRaftActorContext leaderActorContext =
684                 new MockRaftActorContext("leader", getSystem(), leaderActor);
685
686             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
687
688             MockRaftActorContext followerActorContext =
689                 new MockRaftActorContext("follower", getSystem(), followerActor);
690
691             Follower follower = new Follower(followerActorContext);
692
693             ForwardMessageToBehaviorActor.setBehavior(follower);
694
695             Map<String, String> peerAddresses = new HashMap<>();
696             peerAddresses.put(followerActor.path().toString(),
697                 followerActor.path().toString());
698
699             leaderActorContext.setPeerAddresses(peerAddresses);
700
701             leaderActorContext.getReplicatedLog().removeFrom(0);
702
703             //create 3 entries
704             leaderActorContext.setReplicatedLog(
705                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
706
707             leaderActorContext.setCommitIndex(1);
708
709             followerActorContext.getReplicatedLog().removeFrom(0);
710
711             // follower too has the exact same log entries and has the same commit index
712             followerActorContext.setReplicatedLog(
713                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
714
715             followerActorContext.setCommitIndex(1);
716
717             Leader leader = new Leader(leaderActorContext);
718             leader.markFollowerActive(followerActor.path().toString());
719
720             leader.handleMessage(leaderActor, new SendHeartBeat());
721
722             AppendEntriesMessages.AppendEntries appendEntries =
723                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
724                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
725
726             assertNotNull(appendEntries);
727
728             assertEquals(1, appendEntries.getLeaderCommit());
729             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
730             assertEquals(0, appendEntries.getPrevLogIndex());
731
732             AppendEntriesReply appendEntriesReply =
733                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
734                     leaderActor, AppendEntriesReply.class);
735
736             assertNotNull(appendEntriesReply);
737
738             // follower returns its next index
739             assertEquals(2, appendEntriesReply.getLogLastIndex());
740             assertEquals(1, appendEntriesReply.getLogLastTerm());
741
742         }};
743     }
744
745
746     @Test
747     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
748         new JavaTestKit(getSystem()) {{
749
750             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
751
752             MockRaftActorContext leaderActorContext =
753                 new MockRaftActorContext("leader", getSystem(), leaderActor);
754
755             ActorRef followerActor = getSystem().actorOf(
756                 Props.create(ForwardMessageToBehaviorActor.class));
757
758             MockRaftActorContext followerActorContext =
759                 new MockRaftActorContext("follower", getSystem(), followerActor);
760
761             Follower follower = new Follower(followerActorContext);
762
763             ForwardMessageToBehaviorActor.setBehavior(follower);
764
765             Map<String, String> peerAddresses = new HashMap<>();
766             peerAddresses.put(followerActor.path().toString(),
767                 followerActor.path().toString());
768
769             leaderActorContext.setPeerAddresses(peerAddresses);
770
771             leaderActorContext.getReplicatedLog().removeFrom(0);
772
773             leaderActorContext.setReplicatedLog(
774                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
775
776             leaderActorContext.setCommitIndex(1);
777
778             followerActorContext.getReplicatedLog().removeFrom(0);
779
780             followerActorContext.setReplicatedLog(
781                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
782
783             // follower has the same log entries but its commit index > leaders commit index
784             followerActorContext.setCommitIndex(2);
785
786             Leader leader = new Leader(leaderActorContext);
787             leader.markFollowerActive(followerActor.path().toString());
788
789             leader.handleMessage(leaderActor, new SendHeartBeat());
790
791             AppendEntriesMessages.AppendEntries appendEntries =
792                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
793                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
794
795             assertNotNull(appendEntries);
796
797             assertEquals(1, appendEntries.getLeaderCommit());
798             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
799             assertEquals(0, appendEntries.getPrevLogIndex());
800
801             AppendEntriesReply appendEntriesReply =
802                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
803                     leaderActor, AppendEntriesReply.class);
804
805             assertNotNull(appendEntriesReply);
806
807             assertEquals(2, appendEntriesReply.getLogLastIndex());
808             assertEquals(1, appendEntriesReply.getLogLastTerm());
809
810         }};
811     }
812
813     @Test
814     public void testHandleAppendEntriesReplyFailure(){
815         new JavaTestKit(getSystem()) {
816             {
817
818                 ActorRef leaderActor =
819                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
820
821                 ActorRef followerActor =
822                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
823
824
825                 MockRaftActorContext leaderActorContext =
826                     new MockRaftActorContext("leader", getSystem(), leaderActor);
827
828                 Map<String, String> peerAddresses = new HashMap<>();
829                 peerAddresses.put("follower-1",
830                     followerActor.path().toString());
831
832                 leaderActorContext.setPeerAddresses(peerAddresses);
833
834                 Leader leader = new Leader(leaderActorContext);
835
836                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
837
838                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
839
840                 assertEquals(RaftState.Leader, raftActorBehavior.state());
841
842             }};
843     }
844
845     @Test
846     public void testHandleAppendEntriesReplySuccess() throws Exception {
847         new JavaTestKit(getSystem()) {
848             {
849
850                 ActorRef leaderActor =
851                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
852
853                 ActorRef followerActor =
854                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
855
856
857                 MockRaftActorContext leaderActorContext =
858                     new MockRaftActorContext("leader", getSystem(), leaderActor);
859
860                 leaderActorContext.setReplicatedLog(
861                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
862
863                 Map<String, String> peerAddresses = new HashMap<>();
864                 peerAddresses.put("follower-1",
865                     followerActor.path().toString());
866
867                 leaderActorContext.setPeerAddresses(peerAddresses);
868                 leaderActorContext.setCommitIndex(1);
869                 leaderActorContext.setLastApplied(1);
870                 leaderActorContext.getTermInformation().update(1, "leader");
871
872                 Leader leader = new Leader(leaderActorContext);
873
874                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
875
876                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
877
878                 assertEquals(RaftState.Leader, raftActorBehavior.state());
879
880                 assertEquals(2, leaderActorContext.getCommitIndex());
881
882                 ApplyLogEntries applyLogEntries =
883                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
884                         ApplyLogEntries.class);
885
886                 assertNotNull(applyLogEntries);
887
888                 assertEquals(2, leaderActorContext.getLastApplied());
889
890                 assertEquals(2, applyLogEntries.getToIndex());
891
892                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
893                     ApplyState.class);
894
895                 assertEquals(1,applyStateList.size());
896
897                 ApplyState applyState = (ApplyState) applyStateList.get(0);
898
899                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
900
901             }};
902     }
903
904     @Test
905     public void testHandleAppendEntriesReplyUnknownFollower(){
906         new JavaTestKit(getSystem()) {
907             {
908
909                 ActorRef leaderActor =
910                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
911
912                 MockRaftActorContext leaderActorContext =
913                     new MockRaftActorContext("leader", getSystem(), leaderActor);
914
915                 Leader leader = new Leader(leaderActorContext);
916
917                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
918
919                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
920
921                 assertEquals(RaftState.Leader, raftActorBehavior.state());
922
923             }};
924     }
925
926     @Test
927     public void testHandleRequestVoteReply(){
928         new JavaTestKit(getSystem()) {
929             {
930
931                 ActorRef leaderActor =
932                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
933
934                 MockRaftActorContext leaderActorContext =
935                     new MockRaftActorContext("leader", getSystem(), leaderActor);
936
937                 Leader leader = new Leader(leaderActorContext);
938
939                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
940
941                 assertEquals(RaftState.Leader, raftActorBehavior.state());
942
943                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
944
945                 assertEquals(RaftState.Leader, raftActorBehavior.state());
946             }};
947     }
948
949     @Test
950     public void testIsolatedLeaderCheckNoFollowers() {
951         new JavaTestKit(getSystem()) {{
952             ActorRef leaderActor = getTestActor();
953
954             MockRaftActorContext leaderActorContext =
955                 new MockRaftActorContext("leader", getSystem(), leaderActor);
956
957             Map<String, String> peerAddresses = new HashMap<>();
958             leaderActorContext.setPeerAddresses(peerAddresses);
959
960             Leader leader = new Leader(leaderActorContext);
961             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
962             Assert.assertTrue(behavior instanceof Leader);
963         }};
964     }
965
966     @Test
967     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
968         new JavaTestKit(getSystem()) {{
969
970             ActorRef followerActor1 = getTestActor();
971             ActorRef followerActor2 = getTestActor();
972
973             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
974
975             Map<String, String> peerAddresses = new HashMap<>();
976             peerAddresses.put("follower-1", followerActor1.path().toString());
977             peerAddresses.put("follower-2", followerActor2.path().toString());
978
979             leaderActorContext.setPeerAddresses(peerAddresses);
980
981             Leader leader = new Leader(leaderActorContext);
982             leader.stopIsolatedLeaderCheckSchedule();
983
984             leader.markFollowerActive("follower-1");
985             leader.markFollowerActive("follower-2");
986             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
987             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
988                 behavior instanceof Leader);
989
990             // kill 1 follower and verify if that got killed
991             final JavaTestKit probe = new JavaTestKit(getSystem());
992             probe.watch(followerActor1);
993             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
994             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
995             assertEquals(termMsg1.getActor(), followerActor1);
996
997             //sleep enough for all the follower stopwatches to lapse
998             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
999                 getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
1000
1001             leader.markFollowerActive("follower-2");
1002             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1003             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1004                 behavior instanceof Leader);
1005
1006             // kill 2nd follower and leader should change to Isolated leader
1007             followerActor2.tell(PoisonPill.getInstance(), null);
1008             probe.watch(followerActor2);
1009             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1010             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1011             assertEquals(termMsg2.getActor(), followerActor2);
1012
1013             //sleep enough for the remaining the follower stopwatches to lapse
1014             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
1015                 getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
1016
1017             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1018             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1019                 behavior instanceof IsolatedLeader);
1020
1021         }};
1022     }
1023
1024     class MockLeader extends Leader {
1025
1026         FollowerToSnapshot fts;
1027
1028         public MockLeader(RaftActorContext context){
1029             super(context);
1030         }
1031
1032         public FollowerToSnapshot getFollowerToSnapshot() {
1033             return fts;
1034         }
1035
1036         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1037             fts = new FollowerToSnapshot(bs);
1038             mapFollowerToSnapshot.put(followerId, fts);
1039
1040         }
1041     }
1042
1043     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1044
1045         private long electionTimeOutIntervalMillis;
1046         private int snapshotChunkSize;
1047
1048         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1049             super();
1050             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1051             this.snapshotChunkSize = snapshotChunkSize;
1052         }
1053
1054         @Override
1055         public FiniteDuration getElectionTimeOutInterval() {
1056             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1057         }
1058
1059         @Override
1060         public int getSnapshotChunkSize() {
1061             return snapshotChunkSize;
1062         }
1063     }
1064 }