Merge "Remove l2switch sample"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
22 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
28 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
31 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
32
33 import java.io.ByteArrayOutputStream;
34 import java.io.IOException;
35 import java.io.ObjectOutputStream;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
44
45 public class LeaderTest extends AbstractRaftActorBehaviorTest {
46
47     private ActorRef leaderActor =
48         getSystem().actorOf(Props.create(DoNothingActor.class));
49     private ActorRef senderActor =
50         getSystem().actorOf(Props.create(DoNothingActor.class));
51
52     @Test
53     public void testHandleMessageForUnknownMessage() throws Exception {
54         new JavaTestKit(getSystem()) {{
55             Leader leader =
56                 new Leader(createActorContext());
57
58             // handle message should return the Leader state when it receives an
59             // unknown message
60             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
61             Assert.assertTrue(behavior instanceof Leader);
62         }};
63     }
64
65
66     @Test
67     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
68         new JavaTestKit(getSystem()) {{
69
70             new Within(duration("1 seconds")) {
71                 protected void run() {
72
73                     ActorRef followerActor = getTestActor();
74
75                     MockRaftActorContext actorContext =
76                         (MockRaftActorContext) createActorContext();
77
78                     Map<String, String> peerAddresses = new HashMap();
79
80                     peerAddresses.put(followerActor.path().toString(),
81                         followerActor.path().toString());
82
83                     actorContext.setPeerAddresses(peerAddresses);
84
85                     Leader leader = new Leader(actorContext);
86                     leader.handleMessage(senderActor, new SendHeartBeat());
87
88                     final String out =
89                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
90                             // do not put code outside this method, will run afterwards
91                             protected String match(Object in) {
92                                 Object msg = fromSerializableMessage(in);
93                                 if (msg instanceof AppendEntries) {
94                                     if (((AppendEntries)msg).getTerm() == 0) {
95                                         return "match";
96                                     }
97                                     return null;
98                                 } else {
99                                     throw noMatch();
100                                 }
101                             }
102                         }.get(); // this extracts the received message
103
104                     assertEquals("match", out);
105
106                 }
107             };
108         }};
109     }
110
111     @Test
112     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
113         new JavaTestKit(getSystem()) {{
114
115             new Within(duration("1 seconds")) {
116                 protected void run() {
117
118                     ActorRef followerActor = getTestActor();
119
120                     MockRaftActorContext actorContext =
121                         (MockRaftActorContext) createActorContext();
122
123                     Map<String, String> peerAddresses = new HashMap();
124
125                     peerAddresses.put(followerActor.path().toString(),
126                         followerActor.path().toString());
127
128                     actorContext.setPeerAddresses(peerAddresses);
129
130                     Leader leader = new Leader(actorContext);
131                     RaftActorBehavior raftBehavior = leader
132                         .handleMessage(senderActor, new Replicate(null, null,
133                             new MockRaftActorContext.MockReplicatedLogEntry(1,
134                                 100,
135                                 new MockRaftActorContext.MockPayload("foo"))
136                         ));
137
138                     // State should not change
139                     assertTrue(raftBehavior instanceof Leader);
140
141                     final String out =
142                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
143                             // do not put code outside this method, will run afterwards
144                             protected String match(Object in) {
145                                 Object msg = fromSerializableMessage(in);
146                                 if (msg instanceof AppendEntries) {
147                                     if (((AppendEntries)msg).getTerm() == 0) {
148                                         return "match";
149                                     }
150                                     return null;
151                                 } else {
152                                     throw noMatch();
153                                 }
154                             }
155                         }.get(); // this extracts the received message
156
157                     assertEquals("match", out);
158
159                 }
160
161
162             };
163         }};
164     }
165
166     @Test
167     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168         new JavaTestKit(getSystem()) {{
169
170             new Within(duration("1 seconds")) {
171                 protected void run() {
172
173                     ActorRef raftActor = getTestActor();
174
175                     MockRaftActorContext actorContext =
176                         new MockRaftActorContext("test", getSystem(), raftActor);
177
178                     actorContext.getReplicatedLog().removeFrom(0);
179
180                     actorContext.setReplicatedLog(
181                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
182                             .build());
183
184                     Leader leader = new Leader(actorContext);
185                     RaftActorBehavior raftBehavior = leader
186                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
187
188                     // State should not change
189                     assertTrue(raftBehavior instanceof Leader);
190
191                     assertEquals(1, actorContext.getCommitIndex());
192
193                     final String out =
194                         new ExpectMsg<String>(duration("1 seconds"),
195                             "match hint") {
196                             // do not put code outside this method, will run afterwards
197                             protected String match(Object in) {
198                                 if (in instanceof ApplyState) {
199                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
200                                         return "match";
201                                     }
202                                     return null;
203                                 } else {
204                                     throw noMatch();
205                                 }
206                             }
207                         }.get(); // this extracts the received message
208
209                     assertEquals("match", out);
210
211                 }
212             };
213         }};
214     }
215
216     @Test
217     public void testSendInstallSnapshot() {
218         new LeaderTestKit(getSystem()) {{
219
220             new Within(duration("1 seconds")) {
221                 protected void run() {
222                     ActorRef followerActor = getTestActor();
223
224                     Map<String, String> peerAddresses = new HashMap();
225                     peerAddresses.put(followerActor.path().toString(),
226                         followerActor.path().toString());
227
228
229                     MockRaftActorContext actorContext =
230                         (MockRaftActorContext) createActorContext(getRef());
231                     actorContext.setPeerAddresses(peerAddresses);
232
233
234                     Map<String, String> leadersSnapshot = new HashMap<>();
235                     leadersSnapshot.put("1", "A");
236                     leadersSnapshot.put("2", "B");
237                     leadersSnapshot.put("3", "C");
238
239                     //clears leaders log
240                     actorContext.getReplicatedLog().removeFrom(0);
241
242                     final int followersLastIndex = 2;
243                     final int snapshotIndex = 3;
244                     final int newEntryIndex = 4;
245                     final int snapshotTerm = 1;
246                     final int currentTerm = 2;
247
248                     // set the snapshot variables in replicatedlog
249                     actorContext.getReplicatedLog().setSnapshot(
250                         toByteString(leadersSnapshot));
251                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
252                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
253
254                     MockLeader leader = new MockLeader(actorContext);
255                     // set the follower info in leader
256                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
257
258                     // new entry
259                     ReplicatedLogImplEntry entry =
260                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261                             new MockRaftActorContext.MockPayload("D"));
262
263                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
264                     RaftActorBehavior raftBehavior = leader.handleMessage(
265                         senderActor, new Replicate(null, "state-id", entry));
266
267                     assertTrue(raftBehavior instanceof Leader);
268
269                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
270                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
271                         @Override
272                         protected Boolean match(Object o) throws Exception {
273                             if (o instanceof SendInstallSnapshot) {
274                                 return true;
275                             }
276                             return false;
277                         }
278                     }.get();
279
280                     boolean sendInstallSnapshotReceived = false;
281                     for (Boolean b: matches) {
282                         sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
283                     }
284
285                     assertTrue(sendInstallSnapshotReceived);
286
287                 }
288             };
289         }};
290     }
291
292     @Test
293     public void testInstallSnapshot() {
294         new LeaderTestKit(getSystem()) {{
295
296             new Within(duration("1 seconds")) {
297                 protected void run() {
298                     ActorRef followerActor = getTestActor();
299
300                     Map<String, String> peerAddresses = new HashMap();
301                     peerAddresses.put(followerActor.path().toString(),
302                         followerActor.path().toString());
303
304                     MockRaftActorContext actorContext =
305                         (MockRaftActorContext) createActorContext();
306                     actorContext.setPeerAddresses(peerAddresses);
307
308
309                     Map<String, String> leadersSnapshot = new HashMap<>();
310                     leadersSnapshot.put("1", "A");
311                     leadersSnapshot.put("2", "B");
312                     leadersSnapshot.put("3", "C");
313
314                     //clears leaders log
315                     actorContext.getReplicatedLog().removeFrom(0);
316
317                     final int followersLastIndex = 2;
318                     final int snapshotIndex = 3;
319                     final int newEntryIndex = 4;
320                     final int snapshotTerm = 1;
321                     final int currentTerm = 2;
322
323                     // set the snapshot variables in replicatedlog
324                     actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
325                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
326                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
327
328                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
329
330                     MockLeader leader = new MockLeader(actorContext);
331                     // set the follower info in leader
332                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
333
334                     // new entry
335                     ReplicatedLogImplEntry entry =
336                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337                             new MockRaftActorContext.MockPayload("D"));
338
339                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
340
341                     assertTrue(raftBehavior instanceof Leader);
342
343                     // check if installsnapshot gets called with the correct values.
344                     final String out =
345                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
346                             // do not put code outside this method, will run afterwards
347                             protected String match(Object in) {
348                                 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
349                                     InstallSnapshot is = (InstallSnapshot)
350                                         SerializationUtils.fromSerializable(in);
351                                     if (is.getData() == null) {
352                                         return "InstallSnapshot data is null";
353                                     }
354                                     if (is.getLastIncludedIndex() != snapshotIndex) {
355                                         return is.getLastIncludedIndex() + "!=" + snapshotIndex;
356                                     }
357                                     if (is.getLastIncludedTerm() != snapshotTerm) {
358                                         return is.getLastIncludedTerm() + "!=" + snapshotTerm;
359                                     }
360                                     if (is.getTerm() == currentTerm) {
361                                         return is.getTerm() + "!=" + currentTerm;
362                                     }
363
364                                     return "match";
365
366                                } else {
367                                     return "message mismatch:" + in.getClass();
368                                 }
369                             }
370                         }.get(); // this extracts the received message
371
372                     assertEquals("match", out);
373                 }
374             };
375         }};
376     }
377
378     @Test
379     public void testHandleInstallSnapshotReplyLastChunk() {
380         new LeaderTestKit(getSystem()) {{
381             new Within(duration("1 seconds")) {
382                 protected void run() {
383                     ActorRef followerActor = getTestActor();
384
385                     Map<String, String> peerAddresses = new HashMap();
386                     peerAddresses.put(followerActor.path().toString(),
387                         followerActor.path().toString());
388
389                     MockRaftActorContext actorContext =
390                         (MockRaftActorContext) createActorContext();
391                     actorContext.setPeerAddresses(peerAddresses);
392
393                     final int followersLastIndex = 2;
394                     final int snapshotIndex = 3;
395                     final int newEntryIndex = 4;
396                     final int snapshotTerm = 1;
397                     final int currentTerm = 2;
398
399                     MockLeader leader = new MockLeader(actorContext);
400                     // set the follower info in leader
401                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
402
403                     Map<String, String> leadersSnapshot = new HashMap<>();
404                     leadersSnapshot.put("1", "A");
405                     leadersSnapshot.put("2", "B");
406                     leadersSnapshot.put("3", "C");
407
408                     // set the snapshot variables in replicatedlog
409                     actorContext.getReplicatedLog().setSnapshot(
410                         toByteString(leadersSnapshot));
411                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
414
415                     ByteString bs = toByteString(leadersSnapshot);
416                     leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
417                     while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
418                         leader.getFollowerToSnapshot().getNextChunk();
419                         leader.getFollowerToSnapshot().incrementChunkIndex();
420                     }
421
422                     //clears leaders log
423                     actorContext.getReplicatedLog().removeFrom(0);
424
425                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
426                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
427                             leader.getFollowerToSnapshot().getChunkIndex(), true));
428
429                     assertTrue(raftBehavior instanceof Leader);
430
431                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
432                     assertEquals(leader.followerToLog.size(), 1);
433                     assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
434                     FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
435                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
436                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
437                     assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
438                 }
439             };
440         }};
441     }
442
443     @Test
444     public void testFollowerToSnapshotLogic() {
445
446         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
447
448         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
449             @Override
450             public int getSnapshotChunkSize() {
451                 return 50;
452             }
453         });
454
455         MockLeader leader = new MockLeader(actorContext);
456
457         Map<String, String> leadersSnapshot = new HashMap<>();
458         leadersSnapshot.put("1", "A");
459         leadersSnapshot.put("2", "B");
460         leadersSnapshot.put("3", "C");
461
462         ByteString bs = toByteString(leadersSnapshot);
463         byte[] barray = bs.toByteArray();
464
465         leader.createFollowerToSnapshot("followerId", bs);
466         assertEquals(bs.size(), barray.length);
467
468         int chunkIndex=0;
469         for (int i=0; i < barray.length; i = i + 50) {
470             int j = i + 50;
471             chunkIndex++;
472
473             if (i + 50 > barray.length) {
474                 j = barray.length;
475             }
476
477             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
478             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
479             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
480
481             leader.getFollowerToSnapshot().markSendStatus(true);
482             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
483                 leader.getFollowerToSnapshot().incrementChunkIndex();
484             }
485         }
486
487         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
488     }
489
490
491     @Override protected RaftActorBehavior createBehavior(
492         RaftActorContext actorContext) {
493         return new Leader(actorContext);
494     }
495
496     @Override protected RaftActorContext createActorContext() {
497         return createActorContext(leaderActor);
498     }
499
500     protected RaftActorContext createActorContext(ActorRef actorRef) {
501         return new MockRaftActorContext("test", getSystem(), actorRef);
502     }
503
504     private ByteString toByteString(Map<String, String> state) {
505         ByteArrayOutputStream b = null;
506         ObjectOutputStream o = null;
507         try {
508             try {
509                 b = new ByteArrayOutputStream();
510                 o = new ObjectOutputStream(b);
511                 o.writeObject(state);
512                 byte[] snapshotBytes = b.toByteArray();
513                 return ByteString.copyFrom(snapshotBytes);
514             } finally {
515                 if (o != null) {
516                     o.flush();
517                     o.close();
518                 }
519                 if (b != null) {
520                     b.close();
521                 }
522             }
523         } catch (IOException e) {
524             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
525         }
526         return null;
527     }
528
529     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
530         private static AbstractRaftActorBehavior behavior;
531
532         public ForwardMessageToBehaviorActor(){
533
534         }
535
536         @Override public void onReceive(Object message) throws Exception {
537             super.onReceive(message);
538             behavior.handleMessage(sender(), message);
539         }
540
541         public static void setBehavior(AbstractRaftActorBehavior behavior){
542             ForwardMessageToBehaviorActor.behavior = behavior;
543         }
544     }
545
546     @Test
547     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
548         new JavaTestKit(getSystem()) {{
549
550             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
551
552             MockRaftActorContext leaderActorContext =
553                 new MockRaftActorContext("leader", getSystem(), leaderActor);
554
555             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
556
557             MockRaftActorContext followerActorContext =
558                 new MockRaftActorContext("follower", getSystem(), followerActor);
559
560             Follower follower = new Follower(followerActorContext);
561
562             ForwardMessageToBehaviorActor.setBehavior(follower);
563
564             Map<String, String> peerAddresses = new HashMap();
565             peerAddresses.put(followerActor.path().toString(),
566                 followerActor.path().toString());
567
568             leaderActorContext.setPeerAddresses(peerAddresses);
569
570             leaderActorContext.getReplicatedLog().removeFrom(0);
571
572             //create 3 entries
573             leaderActorContext.setReplicatedLog(
574                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
575
576             leaderActorContext.setCommitIndex(1);
577
578             followerActorContext.getReplicatedLog().removeFrom(0);
579
580             // follower too has the exact same log entries and has the same commit index
581             followerActorContext.setReplicatedLog(
582                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
583
584             followerActorContext.setCommitIndex(1);
585
586             Leader leader = new Leader(leaderActorContext);
587
588             leader.handleMessage(leaderActor, new SendHeartBeat());
589
590             AppendEntriesMessages.AppendEntries appendEntries =
591                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
592                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
593
594             assertNotNull(appendEntries);
595
596             assertEquals(1, appendEntries.getLeaderCommit());
597             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
598             assertEquals(0, appendEntries.getPrevLogIndex());
599
600             AppendEntriesReply appendEntriesReply =
601                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
602                     leaderActor, AppendEntriesReply.class);
603
604             assertNotNull(appendEntriesReply);
605
606             // follower returns its next index
607             assertEquals(2, appendEntriesReply.getLogLastIndex());
608             assertEquals(1, appendEntriesReply.getLogLastTerm());
609
610         }};
611     }
612
613
614     @Test
615     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
616         new JavaTestKit(getSystem()) {{
617
618             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
619
620             MockRaftActorContext leaderActorContext =
621                 new MockRaftActorContext("leader", getSystem(), leaderActor);
622
623             ActorRef followerActor = getSystem().actorOf(
624                 Props.create(ForwardMessageToBehaviorActor.class));
625
626             MockRaftActorContext followerActorContext =
627                 new MockRaftActorContext("follower", getSystem(), followerActor);
628
629             Follower follower = new Follower(followerActorContext);
630
631             ForwardMessageToBehaviorActor.setBehavior(follower);
632
633             Map<String, String> peerAddresses = new HashMap();
634             peerAddresses.put(followerActor.path().toString(),
635                 followerActor.path().toString());
636
637             leaderActorContext.setPeerAddresses(peerAddresses);
638
639             leaderActorContext.getReplicatedLog().removeFrom(0);
640
641             leaderActorContext.setReplicatedLog(
642                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
643
644             leaderActorContext.setCommitIndex(1);
645
646             followerActorContext.getReplicatedLog().removeFrom(0);
647
648             followerActorContext.setReplicatedLog(
649                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
650
651             // follower has the same log entries but its commit index > leaders commit index
652             followerActorContext.setCommitIndex(2);
653
654             Leader leader = new Leader(leaderActorContext);
655
656             leader.handleMessage(leaderActor, new SendHeartBeat());
657
658             AppendEntriesMessages.AppendEntries appendEntries =
659                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
660                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
661
662             assertNotNull(appendEntries);
663
664             assertEquals(1, appendEntries.getLeaderCommit());
665             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
666             assertEquals(0, appendEntries.getPrevLogIndex());
667
668             AppendEntriesReply appendEntriesReply =
669                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
670                     leaderActor, AppendEntriesReply.class);
671
672             assertNotNull(appendEntriesReply);
673
674             assertEquals(2, appendEntriesReply.getLogLastIndex());
675             assertEquals(1, appendEntriesReply.getLogLastTerm());
676
677         }};
678     }
679
680     @Test
681     public void testHandleAppendEntriesReplyFailure(){
682         new JavaTestKit(getSystem()) {
683             {
684
685                 ActorRef leaderActor =
686                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
687
688                 ActorRef followerActor =
689                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
690
691
692                 MockRaftActorContext leaderActorContext =
693                     new MockRaftActorContext("leader", getSystem(), leaderActor);
694
695                 Map<String, String> peerAddresses = new HashMap();
696                 peerAddresses.put("follower-1",
697                     followerActor.path().toString());
698
699                 leaderActorContext.setPeerAddresses(peerAddresses);
700
701                 Leader leader = new Leader(leaderActorContext);
702
703                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
704
705                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
706
707                 assertEquals(RaftState.Leader, raftActorBehavior.state());
708
709             }};
710     }
711
712     @Test
713     public void testHandleAppendEntriesReplySuccess() throws Exception {
714         new JavaTestKit(getSystem()) {
715             {
716
717                 ActorRef leaderActor =
718                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
719
720                 ActorRef followerActor =
721                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
722
723
724                 MockRaftActorContext leaderActorContext =
725                     new MockRaftActorContext("leader", getSystem(), leaderActor);
726
727                 leaderActorContext.setReplicatedLog(
728                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
729
730                 Map<String, String> peerAddresses = new HashMap();
731                 peerAddresses.put("follower-1",
732                     followerActor.path().toString());
733
734                 leaderActorContext.setPeerAddresses(peerAddresses);
735                 leaderActorContext.setCommitIndex(1);
736                 leaderActorContext.setLastApplied(1);
737                 leaderActorContext.getTermInformation().update(1, "leader");
738
739                 Leader leader = new Leader(leaderActorContext);
740
741                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
742
743                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
744
745                 assertEquals(RaftState.Leader, raftActorBehavior.state());
746
747                 assertEquals(2, leaderActorContext.getCommitIndex());
748
749                 ApplyLogEntries applyLogEntries =
750                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
751                         ApplyLogEntries.class);
752
753                 assertNotNull(applyLogEntries);
754
755                 assertEquals(2, leaderActorContext.getLastApplied());
756
757                 assertEquals(2, applyLogEntries.getToIndex());
758
759                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
760                     ApplyState.class);
761
762                 assertEquals(1,applyStateList.size());
763
764                 ApplyState applyState = (ApplyState) applyStateList.get(0);
765
766                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
767
768             }};
769     }
770
771     @Test
772     public void testHandleAppendEntriesReplyUnknownFollower(){
773         new JavaTestKit(getSystem()) {
774             {
775
776                 ActorRef leaderActor =
777                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
778
779                 MockRaftActorContext leaderActorContext =
780                     new MockRaftActorContext("leader", getSystem(), leaderActor);
781
782                 Leader leader = new Leader(leaderActorContext);
783
784                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
785
786                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
787
788                 assertEquals(RaftState.Leader, raftActorBehavior.state());
789
790             }};
791     }
792
793     @Test
794     public void testHandleRequestVoteReply(){
795         new JavaTestKit(getSystem()) {
796             {
797
798                 ActorRef leaderActor =
799                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
800
801                 MockRaftActorContext leaderActorContext =
802                     new MockRaftActorContext("leader", getSystem(), leaderActor);
803
804                 Leader leader = new Leader(leaderActorContext);
805
806                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
807
808                 assertEquals(RaftState.Leader, raftActorBehavior.state());
809
810                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
811
812                 assertEquals(RaftState.Leader, raftActorBehavior.state());
813
814
815             }};
816
817     }
818
819     private static class LeaderTestKit extends JavaTestKit {
820
821         private LeaderTestKit(ActorSystem actorSystem) {
822             super(actorSystem);
823         }
824
825         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
826             // Wait for a specific log message to show up
827             final boolean result =
828             new JavaTestKit.EventFilter<Boolean>(logLevel
829             ) {
830                 @Override
831                 protected Boolean run() {
832                     return true;
833                 }
834             }.from(subject.path().toString())
835                 .message(logMessage)
836                 .occurrences(1).exec();
837
838             Assert.assertEquals(true, result);
839
840         }
841     }
842
843     class MockLeader extends Leader {
844
845         FollowerToSnapshot fts;
846
847         public MockLeader(RaftActorContext context){
848             super(context);
849         }
850
851         public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
852             FollowerLogInformation followerLogInformation =
853                 new FollowerLogInformationImpl(followerId,
854                     new AtomicLong(nextIndex),
855                     new AtomicLong(matchIndex));
856             followerToLog.put(followerId, followerLogInformation);
857         }
858
859         public FollowerToSnapshot getFollowerToSnapshot() {
860             return fts;
861         }
862
863         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
864             fts = new FollowerToSnapshot(bs);
865             mapFollowerToSnapshot.put(followerId, fts);
866
867         }
868     }
869 }