Add MD-SAL artifacts
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import com.google.common.base.Optional;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Assert;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
21 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
25 import org.opendaylight.controller.cluster.raft.SerializationUtils;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
28 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
31 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
32 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
33 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
42 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
43 import scala.concurrent.duration.FiniteDuration;
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertNotNull;
46 import static org.junit.Assert.assertTrue;
47
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49
50     private ActorRef leaderActor =
51         getSystem().actorOf(Props.create(DoNothingActor.class));
52     private ActorRef senderActor =
53         getSystem().actorOf(Props.create(DoNothingActor.class));
54
55     @Test
56     public void testHandleMessageForUnknownMessage() throws Exception {
57         new JavaTestKit(getSystem()) {{
58             Leader leader =
59                 new Leader(createActorContext());
60
61             // handle message should return the Leader state when it receives an
62             // unknown message
63             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
64             Assert.assertTrue(behavior instanceof Leader);
65         }};
66     }
67
68
69     @Test
70     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71         new JavaTestKit(getSystem()) {{
72
73             new Within(duration("1 seconds")) {
74                 protected void run() {
75
76                     ActorRef followerActor = getTestActor();
77
78                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
79
80                     Map<String, String> peerAddresses = new HashMap<>();
81
82                     peerAddresses.put(followerActor.path().toString(),
83                         followerActor.path().toString());
84
85                     actorContext.setPeerAddresses(peerAddresses);
86
87                     Leader leader = new Leader(actorContext);
88                     leader.handleMessage(senderActor, new SendHeartBeat());
89
90                     final String out =
91                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
92                             // do not put code outside this method, will run afterwards
93                             protected String match(Object in) {
94                                 Object msg = fromSerializableMessage(in);
95                                 if (msg instanceof AppendEntries) {
96                                     if (((AppendEntries)msg).getTerm() == 0) {
97                                         return "match";
98                                     }
99                                     return null;
100                                 } else {
101                                     throw noMatch();
102                                 }
103                             }
104                         }.get(); // this extracts the received message
105
106                     assertEquals("match", out);
107
108                 }
109             };
110         }};
111     }
112
113     @Test
114     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
115         new JavaTestKit(getSystem()) {{
116
117             new Within(duration("1 seconds")) {
118                 protected void run() {
119
120                     ActorRef followerActor = getTestActor();
121
122                     MockRaftActorContext actorContext =
123                         (MockRaftActorContext) createActorContext();
124
125                     Map<String, String> peerAddresses = new HashMap<>();
126
127                     peerAddresses.put(followerActor.path().toString(),
128                         followerActor.path().toString());
129
130                     actorContext.setPeerAddresses(peerAddresses);
131
132                     Leader leader = new Leader(actorContext);
133                     RaftActorBehavior raftBehavior = leader
134                         .handleMessage(senderActor, new Replicate(null, null,
135                             new MockRaftActorContext.MockReplicatedLogEntry(1,
136                                 100,
137                                 new MockRaftActorContext.MockPayload("foo"))
138                         ));
139
140                     // State should not change
141                     assertTrue(raftBehavior instanceof Leader);
142
143                     final String out =
144                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
145                             // do not put code outside this method, will run afterwards
146                             protected String match(Object in) {
147                                 Object msg = fromSerializableMessage(in);
148                                 if (msg instanceof AppendEntries) {
149                                     if (((AppendEntries)msg).getTerm() == 0) {
150                                         return "match";
151                                     }
152                                     return null;
153                                 } else {
154                                     throw noMatch();
155                                 }
156                             }
157                         }.get(); // this extracts the received message
158
159                     assertEquals("match", out);
160                 }
161             };
162         }};
163     }
164
165     @Test
166     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
167         new JavaTestKit(getSystem()) {{
168
169             new Within(duration("1 seconds")) {
170                 protected void run() {
171
172                     ActorRef raftActor = getTestActor();
173
174                     MockRaftActorContext actorContext =
175                         new MockRaftActorContext("test", getSystem(), raftActor);
176
177                     actorContext.getReplicatedLog().removeFrom(0);
178
179                     actorContext.setReplicatedLog(
180                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
181                             .build());
182
183                     Leader leader = new Leader(actorContext);
184                     RaftActorBehavior raftBehavior = leader
185                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
186
187                     // State should not change
188                     assertTrue(raftBehavior instanceof Leader);
189
190                     assertEquals(1, actorContext.getCommitIndex());
191
192                     final String out =
193                         new ExpectMsg<String>(duration("1 seconds"),
194                             "match hint") {
195                             // do not put code outside this method, will run afterwards
196                             protected String match(Object in) {
197                                 if (in instanceof ApplyState) {
198                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
199                                         return "match";
200                                     }
201                                     return null;
202                                 } else {
203                                     throw noMatch();
204                                 }
205                             }
206                         }.get(); // this extracts the received message
207
208                     assertEquals("match", out);
209
210                 }
211             };
212         }};
213     }
214
215     @Test
216     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
217         new JavaTestKit(getSystem()) {{
218             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
219
220             Map<String, String> peerAddresses = new HashMap<>();
221             peerAddresses.put(followerActor.path().toString(),
222                 followerActor.path().toString());
223
224             MockRaftActorContext actorContext =
225                 (MockRaftActorContext) createActorContext(leaderActor);
226             actorContext.setPeerAddresses(peerAddresses);
227
228             Map<String, String> leadersSnapshot = new HashMap<>();
229             leadersSnapshot.put("1", "A");
230             leadersSnapshot.put("2", "B");
231             leadersSnapshot.put("3", "C");
232
233             //clears leaders log
234             actorContext.getReplicatedLog().removeFrom(0);
235
236             final int followersLastIndex = 2;
237             final int snapshotIndex = 3;
238             final int newEntryIndex = 4;
239             final int snapshotTerm = 1;
240             final int currentTerm = 2;
241
242             // set the snapshot variables in replicatedlog
243             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
244             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
245             actorContext.setCommitIndex(followersLastIndex);
246             //set follower timeout to 2 mins, helps during debugging
247             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
248
249             MockLeader leader = new MockLeader(actorContext);
250
251             // new entry
252             ReplicatedLogImplEntry entry =
253                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
254                     new MockRaftActorContext.MockPayload("D"));
255
256             //update follower timestamp
257             leader.markFollowerActive(followerActor.path().toString());
258
259             ByteString bs = toByteString(leadersSnapshot);
260             leader.setSnapshot(Optional.of(bs));
261             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
262
263             //send first chunk and no InstallSnapshotReply received yet
264             leader.getFollowerToSnapshot().getNextChunk();
265             leader.getFollowerToSnapshot().incrementChunkIndex();
266
267             leader.handleMessage(leaderActor, new SendHeartBeat());
268
269             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
270                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
271
272             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
273                 "received", aeproto);
274
275             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
276
277             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
278
279             //InstallSnapshotReply received
280             leader.getFollowerToSnapshot().markSendStatus(true);
281
282             leader.handleMessage(senderActor, new SendHeartBeat());
283
284             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
285                 MessageCollectorActor.getFirstMatching(followerActor,
286                     InstallSnapshot.SERIALIZABLE_CLASS);
287
288             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
289                 isproto);
290
291             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
292
293             assertEquals(snapshotIndex, is.getLastIncludedIndex());
294
295         }};
296     }
297
298     @Test
299     public void testSendAppendEntriesSnapshotScenario() {
300         new JavaTestKit(getSystem()) {{
301
302             ActorRef followerActor = getTestActor();
303
304             Map<String, String> peerAddresses = new HashMap<>();
305             peerAddresses.put(followerActor.path().toString(),
306                 followerActor.path().toString());
307
308             MockRaftActorContext actorContext =
309                 (MockRaftActorContext) createActorContext(getRef());
310             actorContext.setPeerAddresses(peerAddresses);
311
312             Map<String, String> leadersSnapshot = new HashMap<>();
313             leadersSnapshot.put("1", "A");
314             leadersSnapshot.put("2", "B");
315             leadersSnapshot.put("3", "C");
316
317             //clears leaders log
318             actorContext.getReplicatedLog().removeFrom(0);
319
320             final int followersLastIndex = 2;
321             final int snapshotIndex = 3;
322             final int newEntryIndex = 4;
323             final int snapshotTerm = 1;
324             final int currentTerm = 2;
325
326             // set the snapshot variables in replicatedlog
327             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
328             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
329             actorContext.setCommitIndex(followersLastIndex);
330
331             Leader leader = new Leader(actorContext);
332
333             // new entry
334             ReplicatedLogImplEntry entry =
335                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
336                     new MockRaftActorContext.MockPayload("D"));
337
338             //update follower timestamp
339             leader.markFollowerActive(followerActor.path().toString());
340
341             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
342             RaftActorBehavior raftBehavior = leader.handleMessage(
343                 senderActor, new Replicate(null, "state-id", entry));
344
345             assertTrue(raftBehavior instanceof Leader);
346
347             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
348             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
349                 @Override
350                 protected Boolean match(Object o) throws Exception {
351                     if (o instanceof InitiateInstallSnapshot) {
352                         return true;
353                     }
354                     return false;
355                 }
356             }.get();
357
358             boolean initiateInitiateInstallSnapshot = false;
359             for (Boolean b: matches) {
360                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
361             }
362
363             assertTrue(initiateInitiateInstallSnapshot);
364         }};
365     }
366
367     @Test
368     public void testInitiateInstallSnapshot() throws Exception {
369         new JavaTestKit(getSystem()) {{
370
371             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
372
373             ActorRef followerActor = getTestActor();
374
375             Map<String, String> peerAddresses = new HashMap<>();
376             peerAddresses.put(followerActor.path().toString(),
377                 followerActor.path().toString());
378
379
380             MockRaftActorContext actorContext =
381                 (MockRaftActorContext) createActorContext(leaderActor);
382             actorContext.setPeerAddresses(peerAddresses);
383
384             Map<String, String> leadersSnapshot = new HashMap<>();
385             leadersSnapshot.put("1", "A");
386             leadersSnapshot.put("2", "B");
387             leadersSnapshot.put("3", "C");
388
389             //clears leaders log
390             actorContext.getReplicatedLog().removeFrom(0);
391
392             final int followersLastIndex = 2;
393             final int snapshotIndex = 3;
394             final int newEntryIndex = 4;
395             final int snapshotTerm = 1;
396             final int currentTerm = 2;
397
398             // set the snapshot variables in replicatedlog
399             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
400             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
401             actorContext.setLastApplied(3);
402             actorContext.setCommitIndex(followersLastIndex);
403
404             Leader leader = new Leader(actorContext);
405             // set the snapshot as absent and check if capture-snapshot is invoked.
406             leader.setSnapshot(Optional.<ByteString>absent());
407
408             // new entry
409             ReplicatedLogImplEntry entry =
410                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
411                     new MockRaftActorContext.MockPayload("D"));
412
413             actorContext.getReplicatedLog().append(entry);
414
415             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
416             RaftActorBehavior raftBehavior = leader.handleMessage(
417                 leaderActor, new InitiateInstallSnapshot());
418
419             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
420                 getFirstMatching(leaderActor, CaptureSnapshot.class);
421
422             assertNotNull(cs);
423
424             assertTrue(cs.isInstallSnapshotInitiated());
425             assertEquals(3, cs.getLastAppliedIndex());
426             assertEquals(1, cs.getLastAppliedTerm());
427             assertEquals(4, cs.getLastIndex());
428             assertEquals(2, cs.getLastTerm());
429         }};
430     }
431
432     @Test
433     public void testInstallSnapshot() {
434         new JavaTestKit(getSystem()) {{
435
436             ActorRef followerActor = getTestActor();
437
438             Map<String, String> peerAddresses = new HashMap<>();
439             peerAddresses.put(followerActor.path().toString(),
440                 followerActor.path().toString());
441
442             MockRaftActorContext actorContext =
443                 (MockRaftActorContext) createActorContext();
444             actorContext.setPeerAddresses(peerAddresses);
445
446
447             Map<String, String> leadersSnapshot = new HashMap<>();
448             leadersSnapshot.put("1", "A");
449             leadersSnapshot.put("2", "B");
450             leadersSnapshot.put("3", "C");
451
452             //clears leaders log
453             actorContext.getReplicatedLog().removeFrom(0);
454
455             final int followersLastIndex = 2;
456             final int snapshotIndex = 3;
457             final int newEntryIndex = 4;
458             final int snapshotTerm = 1;
459             final int currentTerm = 2;
460
461             // set the snapshot variables in replicatedlog
462             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
463             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
464             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
465             actorContext.setCommitIndex(followersLastIndex);
466
467             Leader leader = new Leader(actorContext);
468
469             // new entry
470             ReplicatedLogImplEntry entry =
471                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
472                     new MockRaftActorContext.MockPayload("D"));
473
474             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
475                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
476
477             assertTrue(raftBehavior instanceof Leader);
478
479             // check if installsnapshot gets called with the correct values.
480             final String out =
481                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
482                     // do not put code outside this method, will run afterwards
483                     protected String match(Object in) {
484                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
485                             InstallSnapshot is = (InstallSnapshot)
486                                 SerializationUtils.fromSerializable(in);
487                             if (is.getData() == null) {
488                                 return "InstallSnapshot data is null";
489                             }
490                             if (is.getLastIncludedIndex() != snapshotIndex) {
491                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
492                             }
493                             if (is.getLastIncludedTerm() != snapshotTerm) {
494                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
495                             }
496                             if (is.getTerm() == currentTerm) {
497                                 return is.getTerm() + "!=" + currentTerm;
498                             }
499
500                             return "match";
501
502                         } else {
503                             return "message mismatch:" + in.getClass();
504                         }
505                     }
506                 }.get(); // this extracts the received message
507
508             assertEquals("match", out);
509         }};
510     }
511
512     @Test
513     public void testHandleInstallSnapshotReplyLastChunk() {
514         new JavaTestKit(getSystem()) {{
515
516             ActorRef followerActor = getTestActor();
517
518             Map<String, String> peerAddresses = new HashMap<>();
519             peerAddresses.put(followerActor.path().toString(),
520                 followerActor.path().toString());
521
522             final int followersLastIndex = 2;
523             final int snapshotIndex = 3;
524             final int newEntryIndex = 4;
525             final int snapshotTerm = 1;
526             final int currentTerm = 2;
527
528             MockRaftActorContext actorContext =
529                 (MockRaftActorContext) createActorContext();
530             actorContext.setPeerAddresses(peerAddresses);
531             actorContext.setCommitIndex(followersLastIndex);
532
533             MockLeader leader = new MockLeader(actorContext);
534
535             Map<String, String> leadersSnapshot = new HashMap<>();
536             leadersSnapshot.put("1", "A");
537             leadersSnapshot.put("2", "B");
538             leadersSnapshot.put("3", "C");
539
540             // set the snapshot variables in replicatedlog
541
542             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
543             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
544             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
545
546             ByteString bs = toByteString(leadersSnapshot);
547             leader.setSnapshot(Optional.of(bs));
548             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
549             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
550                 leader.getFollowerToSnapshot().getNextChunk();
551                 leader.getFollowerToSnapshot().incrementChunkIndex();
552             }
553
554             //clears leaders log
555             actorContext.getReplicatedLog().removeFrom(0);
556
557             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
558                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
559                     leader.getFollowerToSnapshot().getChunkIndex(), true));
560
561             assertTrue(raftBehavior instanceof Leader);
562
563             assertEquals(leader.mapFollowerToSnapshot.size(), 0);
564             assertEquals(leader.followerToLog.size(), 1);
565             assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
566             FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
567             assertEquals(snapshotIndex, fli.getMatchIndex().get());
568             assertEquals(snapshotIndex, fli.getMatchIndex().get());
569             assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
570         }};
571     }
572
573     @Test
574     public void testFollowerToSnapshotLogic() {
575
576         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
577
578         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
579             @Override
580             public int getSnapshotChunkSize() {
581                 return 50;
582             }
583         });
584
585         MockLeader leader = new MockLeader(actorContext);
586
587         Map<String, String> leadersSnapshot = new HashMap<>();
588         leadersSnapshot.put("1", "A");
589         leadersSnapshot.put("2", "B");
590         leadersSnapshot.put("3", "C");
591
592         ByteString bs = toByteString(leadersSnapshot);
593         byte[] barray = bs.toByteArray();
594
595         leader.createFollowerToSnapshot("followerId", bs);
596         assertEquals(bs.size(), barray.length);
597
598         int chunkIndex=0;
599         for (int i=0; i < barray.length; i = i + 50) {
600             int j = i + 50;
601             chunkIndex++;
602
603             if (i + 50 > barray.length) {
604                 j = barray.length;
605             }
606
607             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
608             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
609             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
610
611             leader.getFollowerToSnapshot().markSendStatus(true);
612             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
613                 leader.getFollowerToSnapshot().incrementChunkIndex();
614             }
615         }
616
617         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
618     }
619
620
621     @Override protected RaftActorBehavior createBehavior(
622         RaftActorContext actorContext) {
623         return new Leader(actorContext);
624     }
625
626     @Override protected RaftActorContext createActorContext() {
627         return createActorContext(leaderActor);
628     }
629
630     protected RaftActorContext createActorContext(ActorRef actorRef) {
631         return new MockRaftActorContext("test", getSystem(), actorRef);
632     }
633
634     private ByteString toByteString(Map<String, String> state) {
635         ByteArrayOutputStream b = null;
636         ObjectOutputStream o = null;
637         try {
638             try {
639                 b = new ByteArrayOutputStream();
640                 o = new ObjectOutputStream(b);
641                 o.writeObject(state);
642                 byte[] snapshotBytes = b.toByteArray();
643                 return ByteString.copyFrom(snapshotBytes);
644             } finally {
645                 if (o != null) {
646                     o.flush();
647                     o.close();
648                 }
649                 if (b != null) {
650                     b.close();
651                 }
652             }
653         } catch (IOException e) {
654             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
655         }
656         return null;
657     }
658
659     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
660         private static AbstractRaftActorBehavior behavior;
661
662         public ForwardMessageToBehaviorActor(){
663
664         }
665
666         @Override public void onReceive(Object message) throws Exception {
667             super.onReceive(message);
668             behavior.handleMessage(sender(), message);
669         }
670
671         public static void setBehavior(AbstractRaftActorBehavior behavior){
672             ForwardMessageToBehaviorActor.behavior = behavior;
673         }
674     }
675
676     @Test
677     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
678         new JavaTestKit(getSystem()) {{
679
680             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
681
682             MockRaftActorContext leaderActorContext =
683                 new MockRaftActorContext("leader", getSystem(), leaderActor);
684
685             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
686
687             MockRaftActorContext followerActorContext =
688                 new MockRaftActorContext("follower", getSystem(), followerActor);
689
690             Follower follower = new Follower(followerActorContext);
691
692             ForwardMessageToBehaviorActor.setBehavior(follower);
693
694             Map<String, String> peerAddresses = new HashMap<>();
695             peerAddresses.put(followerActor.path().toString(),
696                 followerActor.path().toString());
697
698             leaderActorContext.setPeerAddresses(peerAddresses);
699
700             leaderActorContext.getReplicatedLog().removeFrom(0);
701
702             //create 3 entries
703             leaderActorContext.setReplicatedLog(
704                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
705
706             leaderActorContext.setCommitIndex(1);
707
708             followerActorContext.getReplicatedLog().removeFrom(0);
709
710             // follower too has the exact same log entries and has the same commit index
711             followerActorContext.setReplicatedLog(
712                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
713
714             followerActorContext.setCommitIndex(1);
715
716             Leader leader = new Leader(leaderActorContext);
717             leader.markFollowerActive(followerActor.path().toString());
718
719             leader.handleMessage(leaderActor, new SendHeartBeat());
720
721             AppendEntriesMessages.AppendEntries appendEntries =
722                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
723                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
724
725             assertNotNull(appendEntries);
726
727             assertEquals(1, appendEntries.getLeaderCommit());
728             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
729             assertEquals(0, appendEntries.getPrevLogIndex());
730
731             AppendEntriesReply appendEntriesReply =
732                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
733                     leaderActor, AppendEntriesReply.class);
734
735             assertNotNull(appendEntriesReply);
736
737             // follower returns its next index
738             assertEquals(2, appendEntriesReply.getLogLastIndex());
739             assertEquals(1, appendEntriesReply.getLogLastTerm());
740
741         }};
742     }
743
744
745     @Test
746     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
747         new JavaTestKit(getSystem()) {{
748
749             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
750
751             MockRaftActorContext leaderActorContext =
752                 new MockRaftActorContext("leader", getSystem(), leaderActor);
753
754             ActorRef followerActor = getSystem().actorOf(
755                 Props.create(ForwardMessageToBehaviorActor.class));
756
757             MockRaftActorContext followerActorContext =
758                 new MockRaftActorContext("follower", getSystem(), followerActor);
759
760             Follower follower = new Follower(followerActorContext);
761
762             ForwardMessageToBehaviorActor.setBehavior(follower);
763
764             Map<String, String> peerAddresses = new HashMap<>();
765             peerAddresses.put(followerActor.path().toString(),
766                 followerActor.path().toString());
767
768             leaderActorContext.setPeerAddresses(peerAddresses);
769
770             leaderActorContext.getReplicatedLog().removeFrom(0);
771
772             leaderActorContext.setReplicatedLog(
773                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
774
775             leaderActorContext.setCommitIndex(1);
776
777             followerActorContext.getReplicatedLog().removeFrom(0);
778
779             followerActorContext.setReplicatedLog(
780                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
781
782             // follower has the same log entries but its commit index > leaders commit index
783             followerActorContext.setCommitIndex(2);
784
785             Leader leader = new Leader(leaderActorContext);
786             leader.markFollowerActive(followerActor.path().toString());
787
788             leader.handleMessage(leaderActor, new SendHeartBeat());
789
790             AppendEntriesMessages.AppendEntries appendEntries =
791                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
792                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
793
794             assertNotNull(appendEntries);
795
796             assertEquals(1, appendEntries.getLeaderCommit());
797             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
798             assertEquals(0, appendEntries.getPrevLogIndex());
799
800             AppendEntriesReply appendEntriesReply =
801                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
802                     leaderActor, AppendEntriesReply.class);
803
804             assertNotNull(appendEntriesReply);
805
806             assertEquals(2, appendEntriesReply.getLogLastIndex());
807             assertEquals(1, appendEntriesReply.getLogLastTerm());
808
809         }};
810     }
811
812     @Test
813     public void testHandleAppendEntriesReplyFailure(){
814         new JavaTestKit(getSystem()) {
815             {
816
817                 ActorRef leaderActor =
818                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
819
820                 ActorRef followerActor =
821                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
822
823
824                 MockRaftActorContext leaderActorContext =
825                     new MockRaftActorContext("leader", getSystem(), leaderActor);
826
827                 Map<String, String> peerAddresses = new HashMap<>();
828                 peerAddresses.put("follower-1",
829                     followerActor.path().toString());
830
831                 leaderActorContext.setPeerAddresses(peerAddresses);
832
833                 Leader leader = new Leader(leaderActorContext);
834
835                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
836
837                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
838
839                 assertEquals(RaftState.Leader, raftActorBehavior.state());
840
841             }};
842     }
843
844     @Test
845     public void testHandleAppendEntriesReplySuccess() throws Exception {
846         new JavaTestKit(getSystem()) {
847             {
848
849                 ActorRef leaderActor =
850                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
851
852                 ActorRef followerActor =
853                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
854
855
856                 MockRaftActorContext leaderActorContext =
857                     new MockRaftActorContext("leader", getSystem(), leaderActor);
858
859                 leaderActorContext.setReplicatedLog(
860                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
861
862                 Map<String, String> peerAddresses = new HashMap<>();
863                 peerAddresses.put("follower-1",
864                     followerActor.path().toString());
865
866                 leaderActorContext.setPeerAddresses(peerAddresses);
867                 leaderActorContext.setCommitIndex(1);
868                 leaderActorContext.setLastApplied(1);
869                 leaderActorContext.getTermInformation().update(1, "leader");
870
871                 Leader leader = new Leader(leaderActorContext);
872
873                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
874
875                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
876
877                 assertEquals(RaftState.Leader, raftActorBehavior.state());
878
879                 assertEquals(2, leaderActorContext.getCommitIndex());
880
881                 ApplyLogEntries applyLogEntries =
882                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
883                         ApplyLogEntries.class);
884
885                 assertNotNull(applyLogEntries);
886
887                 assertEquals(2, leaderActorContext.getLastApplied());
888
889                 assertEquals(2, applyLogEntries.getToIndex());
890
891                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
892                     ApplyState.class);
893
894                 assertEquals(1,applyStateList.size());
895
896                 ApplyState applyState = (ApplyState) applyStateList.get(0);
897
898                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
899
900             }};
901     }
902
903     @Test
904     public void testHandleAppendEntriesReplyUnknownFollower(){
905         new JavaTestKit(getSystem()) {
906             {
907
908                 ActorRef leaderActor =
909                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
910
911                 MockRaftActorContext leaderActorContext =
912                     new MockRaftActorContext("leader", getSystem(), leaderActor);
913
914                 Leader leader = new Leader(leaderActorContext);
915
916                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
917
918                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
919
920                 assertEquals(RaftState.Leader, raftActorBehavior.state());
921
922             }};
923     }
924
925     @Test
926     public void testHandleRequestVoteReply(){
927         new JavaTestKit(getSystem()) {
928             {
929
930                 ActorRef leaderActor =
931                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
932
933                 MockRaftActorContext leaderActorContext =
934                     new MockRaftActorContext("leader", getSystem(), leaderActor);
935
936                 Leader leader = new Leader(leaderActorContext);
937
938                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
939
940                 assertEquals(RaftState.Leader, raftActorBehavior.state());
941
942                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
943
944                 assertEquals(RaftState.Leader, raftActorBehavior.state());
945             }};
946     }
947
948     @Test
949     public void testIsolatedLeaderCheckNoFollowers() {
950         new JavaTestKit(getSystem()) {{
951             ActorRef leaderActor = getTestActor();
952
953             MockRaftActorContext leaderActorContext =
954                 new MockRaftActorContext("leader", getSystem(), leaderActor);
955
956             Map<String, String> peerAddresses = new HashMap<>();
957             leaderActorContext.setPeerAddresses(peerAddresses);
958
959             Leader leader = new Leader(leaderActorContext);
960             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
961             Assert.assertTrue(behavior instanceof Leader);
962         }};
963     }
964
965     @Test
966     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
967         new JavaTestKit(getSystem()) {{
968
969             ActorRef followerActor1 = getTestActor();
970             ActorRef followerActor2 = getTestActor();
971
972             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
973
974             Map<String, String> peerAddresses = new HashMap<>();
975             peerAddresses.put("follower-1", followerActor1.path().toString());
976             peerAddresses.put("follower-2", followerActor2.path().toString());
977
978             leaderActorContext.setPeerAddresses(peerAddresses);
979
980             Leader leader = new Leader(leaderActorContext);
981             leader.stopIsolatedLeaderCheckSchedule();
982
983             leader.markFollowerActive("follower-1");
984             leader.markFollowerActive("follower-2");
985             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
986             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
987                 behavior instanceof Leader);
988
989             // kill 1 follower and verify if that got killed
990             final JavaTestKit probe = new JavaTestKit(getSystem());
991             probe.watch(followerActor1);
992             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
993             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
994             assertEquals(termMsg1.getActor(), followerActor1);
995
996             leader.markFollowerInActive("follower-1");
997             leader.markFollowerActive("follower-2");
998             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
999             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1000                 behavior instanceof Leader);
1001
1002             // kill 2nd follower and leader should change to Isolated leader
1003             followerActor2.tell(PoisonPill.getInstance(), null);
1004             probe.watch(followerActor2);
1005             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1006             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1007             assertEquals(termMsg2.getActor(), followerActor2);
1008
1009             leader.markFollowerInActive("follower-2");
1010             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1011             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1012                 behavior instanceof IsolatedLeader);
1013
1014         }};
1015     }
1016
1017     class MockLeader extends Leader {
1018
1019         FollowerToSnapshot fts;
1020
1021         public MockLeader(RaftActorContext context){
1022             super(context);
1023         }
1024
1025         public FollowerToSnapshot getFollowerToSnapshot() {
1026             return fts;
1027         }
1028
1029         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1030             fts = new FollowerToSnapshot(bs);
1031             mapFollowerToSnapshot.put(followerId, fts);
1032
1033         }
1034     }
1035
1036     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1037
1038         private long electionTimeOutIntervalMillis;
1039         private int snapshotChunkSize;
1040
1041         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1042             super();
1043             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1044             this.snapshotChunkSize = snapshotChunkSize;
1045         }
1046
1047         @Override
1048         public FiniteDuration getElectionTimeOutInterval() {
1049             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1050         }
1051
1052         @Override
1053         public int getSnapshotChunkSize() {
1054             return snapshotChunkSize;
1055         }
1056     }
1057 }