Merge "BUG-2184 Fix config.yang module(add type as a key for modules list)"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
16 import org.opendaylight.controller.cluster.raft.SerializationUtils;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
19 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
27 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
28 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
29
30 import java.io.ByteArrayOutputStream;
31 import java.io.IOException;
32 import java.io.ObjectOutputStream;
33 import java.util.HashMap;
34 import java.util.Map;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertNotNull;
39 import static org.junit.Assert.assertTrue;
40
41 public class LeaderTest extends AbstractRaftActorBehaviorTest {
42
43     private ActorRef leaderActor =
44         getSystem().actorOf(Props.create(DoNothingActor.class));
45     private ActorRef senderActor =
46         getSystem().actorOf(Props.create(DoNothingActor.class));
47
48     @Test
49     public void testHandleMessageForUnknownMessage() throws Exception {
50         new JavaTestKit(getSystem()) {{
51             Leader leader =
52                 new Leader(createActorContext());
53
54             // handle message should return the Leader state when it receives an
55             // unknown message
56             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
57             Assert.assertTrue(behavior instanceof Leader);
58         }};
59     }
60
61
62     @Test
63     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
64         new JavaTestKit(getSystem()) {{
65
66             new Within(duration("1 seconds")) {
67                 protected void run() {
68
69                     ActorRef followerActor = getTestActor();
70
71                     MockRaftActorContext actorContext =
72                         (MockRaftActorContext) createActorContext();
73
74                     Map<String, String> peerAddresses = new HashMap();
75
76                     peerAddresses.put(followerActor.path().toString(),
77                         followerActor.path().toString());
78
79                     actorContext.setPeerAddresses(peerAddresses);
80
81                     Leader leader = new Leader(actorContext);
82                     leader.handleMessage(senderActor, new SendHeartBeat());
83
84                     final String out =
85                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
86                             // do not put code outside this method, will run afterwards
87                             protected String match(Object in) {
88                                 Object msg = fromSerializableMessage(in);
89                                 if (msg instanceof AppendEntries) {
90                                     if (((AppendEntries)msg).getTerm() == 0) {
91                                         return "match";
92                                     }
93                                     return null;
94                                 } else {
95                                     throw noMatch();
96                                 }
97                             }
98                         }.get(); // this extracts the received message
99
100                     assertEquals("match", out);
101
102                 }
103             };
104         }};
105     }
106
107     @Test
108     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
109         new JavaTestKit(getSystem()) {{
110
111             new Within(duration("1 seconds")) {
112                 protected void run() {
113
114                     ActorRef followerActor = getTestActor();
115
116                     MockRaftActorContext actorContext =
117                         (MockRaftActorContext) createActorContext();
118
119                     Map<String, String> peerAddresses = new HashMap();
120
121                     peerAddresses.put(followerActor.path().toString(),
122                         followerActor.path().toString());
123
124                     actorContext.setPeerAddresses(peerAddresses);
125
126                     Leader leader = new Leader(actorContext);
127                     RaftActorBehavior raftBehavior = leader
128                         .handleMessage(senderActor, new Replicate(null, null,
129                             new MockRaftActorContext.MockReplicatedLogEntry(1,
130                                 100,
131                                 new MockRaftActorContext.MockPayload("foo"))
132                         ));
133
134                     // State should not change
135                     assertTrue(raftBehavior instanceof Leader);
136
137                     final String out =
138                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
139                             // do not put code outside this method, will run afterwards
140                             protected String match(Object in) {
141                                 Object msg = fromSerializableMessage(in);
142                                 if (msg instanceof AppendEntries) {
143                                     if (((AppendEntries)msg).getTerm() == 0) {
144                                         return "match";
145                                     }
146                                     return null;
147                                 } else {
148                                     throw noMatch();
149                                 }
150                             }
151                         }.get(); // this extracts the received message
152
153                     assertEquals("match", out);
154
155                 }
156
157
158             };
159         }};
160     }
161
162     @Test
163     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
164         new JavaTestKit(getSystem()) {{
165
166             new Within(duration("1 seconds")) {
167                 protected void run() {
168
169                     ActorRef raftActor = getTestActor();
170
171                     MockRaftActorContext actorContext =
172                         new MockRaftActorContext("test", getSystem(), raftActor);
173
174                     actorContext.getReplicatedLog().removeFrom(0);
175
176                     actorContext.setReplicatedLog(
177                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
178                             .build());
179
180                     Leader leader = new Leader(actorContext);
181                     RaftActorBehavior raftBehavior = leader
182                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
183
184                     // State should not change
185                     assertTrue(raftBehavior instanceof Leader);
186
187                     assertEquals(1, actorContext.getCommitIndex());
188
189                     final String out =
190                         new ExpectMsg<String>(duration("1 seconds"),
191                             "match hint") {
192                             // do not put code outside this method, will run afterwards
193                             protected String match(Object in) {
194                                 if (in instanceof ApplyState) {
195                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
196                                         return "match";
197                                     }
198                                     return null;
199                                 } else {
200                                     throw noMatch();
201                                 }
202                             }
203                         }.get(); // this extracts the received message
204
205                     assertEquals("match", out);
206
207                 }
208             };
209         }};
210     }
211
212     @Test
213     public void testSendInstallSnapshot() {
214         new LeaderTestKit(getSystem()) {{
215
216             new Within(duration("1 seconds")) {
217                 protected void run() {
218                     ActorRef followerActor = getTestActor();
219
220                     Map<String, String> peerAddresses = new HashMap();
221                     peerAddresses.put(followerActor.path().toString(),
222                         followerActor.path().toString());
223
224
225                     MockRaftActorContext actorContext =
226                         (MockRaftActorContext) createActorContext(getRef());
227                     actorContext.setPeerAddresses(peerAddresses);
228
229
230                     Map<String, String> leadersSnapshot = new HashMap<>();
231                     leadersSnapshot.put("1", "A");
232                     leadersSnapshot.put("2", "B");
233                     leadersSnapshot.put("3", "C");
234
235                     //clears leaders log
236                     actorContext.getReplicatedLog().removeFrom(0);
237
238                     final int followersLastIndex = 2;
239                     final int snapshotIndex = 3;
240                     final int newEntryIndex = 4;
241                     final int snapshotTerm = 1;
242                     final int currentTerm = 2;
243
244                     // set the snapshot variables in replicatedlog
245                     actorContext.getReplicatedLog().setSnapshot(
246                         toByteString(leadersSnapshot));
247                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
248                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
249
250                     MockLeader leader = new MockLeader(actorContext);
251                     // set the follower info in leader
252                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
253
254                     // new entry
255                     ReplicatedLogImplEntry entry =
256                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
257                             new MockRaftActorContext.MockPayload("D"));
258
259                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
260                     RaftActorBehavior raftBehavior = leader.handleMessage(
261                         senderActor, new Replicate(null, "state-id", entry));
262
263                     assertTrue(raftBehavior instanceof Leader);
264
265                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
266                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
267                         @Override
268                         protected Boolean match(Object o) throws Exception {
269                             if (o instanceof SendInstallSnapshot) {
270                                 return true;
271                             }
272                             return false;
273                         }
274                     }.get();
275
276                     boolean sendInstallSnapshotReceived = false;
277                     for (Boolean b: matches) {
278                         sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
279                     }
280
281                     assertTrue(sendInstallSnapshotReceived);
282
283                 }
284             };
285         }};
286     }
287
288     @Test
289     public void testInstallSnapshot() {
290         new LeaderTestKit(getSystem()) {{
291
292             new Within(duration("1 seconds")) {
293                 protected void run() {
294                     ActorRef followerActor = getTestActor();
295
296                     Map<String, String> peerAddresses = new HashMap();
297                     peerAddresses.put(followerActor.path().toString(),
298                         followerActor.path().toString());
299
300                     MockRaftActorContext actorContext =
301                         (MockRaftActorContext) createActorContext();
302                     actorContext.setPeerAddresses(peerAddresses);
303
304
305                     Map<String, String> leadersSnapshot = new HashMap<>();
306                     leadersSnapshot.put("1", "A");
307                     leadersSnapshot.put("2", "B");
308                     leadersSnapshot.put("3", "C");
309
310                     //clears leaders log
311                     actorContext.getReplicatedLog().removeFrom(0);
312
313                     final int followersLastIndex = 2;
314                     final int snapshotIndex = 3;
315                     final int newEntryIndex = 4;
316                     final int snapshotTerm = 1;
317                     final int currentTerm = 2;
318
319                     // set the snapshot variables in replicatedlog
320                     actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
321                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
322                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
323
324                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
325
326                     MockLeader leader = new MockLeader(actorContext);
327                     // set the follower info in leader
328                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
329
330                     // new entry
331                     ReplicatedLogImplEntry entry =
332                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
333                             new MockRaftActorContext.MockPayload("D"));
334
335                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
336
337                     assertTrue(raftBehavior instanceof Leader);
338
339                     // check if installsnapshot gets called with the correct values.
340                     final String out =
341                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
342                             // do not put code outside this method, will run afterwards
343                             protected String match(Object in) {
344                                 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
345                                     InstallSnapshot is = (InstallSnapshot)
346                                         SerializationUtils.fromSerializable(in);
347                                     if (is.getData() == null) {
348                                         return "InstallSnapshot data is null";
349                                     }
350                                     if (is.getLastIncludedIndex() != snapshotIndex) {
351                                         return is.getLastIncludedIndex() + "!=" + snapshotIndex;
352                                     }
353                                     if (is.getLastIncludedTerm() != snapshotTerm) {
354                                         return is.getLastIncludedTerm() + "!=" + snapshotTerm;
355                                     }
356                                     if (is.getTerm() == currentTerm) {
357                                         return is.getTerm() + "!=" + currentTerm;
358                                     }
359
360                                     return "match";
361
362                                } else {
363                                     return "message mismatch:" + in.getClass();
364                                 }
365                             }
366                         }.get(); // this extracts the received message
367
368                     assertEquals("match", out);
369                 }
370             };
371         }};
372     }
373
374     @Test
375     public void testHandleInstallSnapshotReplyLastChunk() {
376         new LeaderTestKit(getSystem()) {{
377             new Within(duration("1 seconds")) {
378                 protected void run() {
379                     ActorRef followerActor = getTestActor();
380
381                     Map<String, String> peerAddresses = new HashMap();
382                     peerAddresses.put(followerActor.path().toString(),
383                         followerActor.path().toString());
384
385                     MockRaftActorContext actorContext =
386                         (MockRaftActorContext) createActorContext();
387                     actorContext.setPeerAddresses(peerAddresses);
388
389                     final int followersLastIndex = 2;
390                     final int snapshotIndex = 3;
391                     final int newEntryIndex = 4;
392                     final int snapshotTerm = 1;
393                     final int currentTerm = 2;
394
395                     MockLeader leader = new MockLeader(actorContext);
396                     // set the follower info in leader
397                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
398
399                     Map<String, String> leadersSnapshot = new HashMap<>();
400                     leadersSnapshot.put("1", "A");
401                     leadersSnapshot.put("2", "B");
402                     leadersSnapshot.put("3", "C");
403
404                     // set the snapshot variables in replicatedlog
405                     actorContext.getReplicatedLog().setSnapshot(
406                         toByteString(leadersSnapshot));
407                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
408                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
409                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
410
411                     ByteString bs = toByteString(leadersSnapshot);
412                     leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
413                     while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
414                         leader.getFollowerToSnapshot().getNextChunk();
415                         leader.getFollowerToSnapshot().incrementChunkIndex();
416                     }
417
418                     //clears leaders log
419                     actorContext.getReplicatedLog().removeFrom(0);
420
421                     RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
422                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
423                             leader.getFollowerToSnapshot().getChunkIndex(), true));
424
425                     assertTrue(raftBehavior instanceof Leader);
426
427                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
428                     assertEquals(leader.followerToLog.size(), 1);
429                     assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
430                     FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
431                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
432                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
433                     assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
434                 }
435             };
436         }};
437     }
438
439     @Test
440     public void testFollowerToSnapshotLogic() {
441
442         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
443
444         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
445             @Override
446             public int getSnapshotChunkSize() {
447                 return 50;
448             }
449         });
450
451         MockLeader leader = new MockLeader(actorContext);
452
453         Map<String, String> leadersSnapshot = new HashMap<>();
454         leadersSnapshot.put("1", "A");
455         leadersSnapshot.put("2", "B");
456         leadersSnapshot.put("3", "C");
457
458         ByteString bs = toByteString(leadersSnapshot);
459         byte[] barray = bs.toByteArray();
460
461         leader.createFollowerToSnapshot("followerId", bs);
462         assertEquals(bs.size(), barray.length);
463
464         int chunkIndex=0;
465         for (int i=0; i < barray.length; i = i + 50) {
466             int j = i + 50;
467             chunkIndex++;
468
469             if (i + 50 > barray.length) {
470                 j = barray.length;
471             }
472
473             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
474             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
475             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
476
477             leader.getFollowerToSnapshot().markSendStatus(true);
478             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
479                 leader.getFollowerToSnapshot().incrementChunkIndex();
480             }
481         }
482
483         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
484     }
485
486
487     @Override protected RaftActorBehavior createBehavior(
488         RaftActorContext actorContext) {
489         return new Leader(actorContext);
490     }
491
492     @Override protected RaftActorContext createActorContext() {
493         return createActorContext(leaderActor);
494     }
495
496     protected RaftActorContext createActorContext(ActorRef actorRef) {
497         return new MockRaftActorContext("test", getSystem(), actorRef);
498     }
499
500     private ByteString toByteString(Map<String, String> state) {
501         ByteArrayOutputStream b = null;
502         ObjectOutputStream o = null;
503         try {
504             try {
505                 b = new ByteArrayOutputStream();
506                 o = new ObjectOutputStream(b);
507                 o.writeObject(state);
508                 byte[] snapshotBytes = b.toByteArray();
509                 return ByteString.copyFrom(snapshotBytes);
510             } finally {
511                 if (o != null) {
512                     o.flush();
513                     o.close();
514                 }
515                 if (b != null) {
516                     b.close();
517                 }
518             }
519         } catch (IOException e) {
520             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
521         }
522         return null;
523     }
524
525     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
526         private static AbstractRaftActorBehavior behavior;
527
528         public ForwardMessageToBehaviorActor(){
529
530         }
531
532         @Override public void onReceive(Object message) throws Exception {
533             super.onReceive(message);
534             behavior.handleMessage(sender(), message);
535         }
536
537         public static void setBehavior(AbstractRaftActorBehavior behavior){
538             ForwardMessageToBehaviorActor.behavior = behavior;
539         }
540     }
541
542     @Test
543     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
544         new JavaTestKit(getSystem()) {{
545
546             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
547
548             MockRaftActorContext leaderActorContext =
549                 new MockRaftActorContext("leader", getSystem(), leaderActor);
550
551             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
552
553             MockRaftActorContext followerActorContext =
554                 new MockRaftActorContext("follower", getSystem(), followerActor);
555
556             Follower follower = new Follower(followerActorContext);
557
558             ForwardMessageToBehaviorActor.setBehavior(follower);
559
560             Map<String, String> peerAddresses = new HashMap();
561             peerAddresses.put(followerActor.path().toString(),
562                 followerActor.path().toString());
563
564             leaderActorContext.setPeerAddresses(peerAddresses);
565
566             leaderActorContext.getReplicatedLog().removeFrom(0);
567
568             //create 3 entries
569             leaderActorContext.setReplicatedLog(
570                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
571
572             leaderActorContext.setCommitIndex(1);
573
574             followerActorContext.getReplicatedLog().removeFrom(0);
575
576             // follower too has the exact same log entries and has the same commit index
577             followerActorContext.setReplicatedLog(
578                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
579
580             followerActorContext.setCommitIndex(1);
581
582             Leader leader = new Leader(leaderActorContext);
583
584             leader.handleMessage(leaderActor, new SendHeartBeat());
585
586             AppendEntriesMessages.AppendEntries appendEntries =
587                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
588                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
589
590             assertNotNull(appendEntries);
591
592             assertEquals(1, appendEntries.getLeaderCommit());
593             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
594             assertEquals(0, appendEntries.getPrevLogIndex());
595
596             AppendEntriesReply appendEntriesReply =
597                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
598                     leaderActor, AppendEntriesReply.class);
599
600             assertNotNull(appendEntriesReply);
601
602             // follower returns its next index
603             assertEquals(2, appendEntriesReply.getLogLastIndex());
604             assertEquals(1, appendEntriesReply.getLogLastTerm());
605
606         }};
607     }
608
609
610     @Test
611     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
612         new JavaTestKit(getSystem()) {{
613
614             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
615
616             MockRaftActorContext leaderActorContext =
617                 new MockRaftActorContext("leader", getSystem(), leaderActor);
618
619             ActorRef followerActor = getSystem().actorOf(
620                 Props.create(ForwardMessageToBehaviorActor.class));
621
622             MockRaftActorContext followerActorContext =
623                 new MockRaftActorContext("follower", getSystem(), followerActor);
624
625             Follower follower = new Follower(followerActorContext);
626
627             ForwardMessageToBehaviorActor.setBehavior(follower);
628
629             Map<String, String> peerAddresses = new HashMap();
630             peerAddresses.put(followerActor.path().toString(),
631                 followerActor.path().toString());
632
633             leaderActorContext.setPeerAddresses(peerAddresses);
634
635             leaderActorContext.getReplicatedLog().removeFrom(0);
636
637             leaderActorContext.setReplicatedLog(
638                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
639
640             leaderActorContext.setCommitIndex(1);
641
642             followerActorContext.getReplicatedLog().removeFrom(0);
643
644             followerActorContext.setReplicatedLog(
645                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
646
647             // follower has the same log entries but its commit index > leaders commit index
648             followerActorContext.setCommitIndex(2);
649
650             Leader leader = new Leader(leaderActorContext);
651
652             leader.handleMessage(leaderActor, new SendHeartBeat());
653
654             AppendEntriesMessages.AppendEntries appendEntries =
655                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
656                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
657
658             assertNotNull(appendEntries);
659
660             assertEquals(1, appendEntries.getLeaderCommit());
661             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
662             assertEquals(0, appendEntries.getPrevLogIndex());
663
664             AppendEntriesReply appendEntriesReply =
665                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
666                     leaderActor, AppendEntriesReply.class);
667
668             assertNotNull(appendEntriesReply);
669
670             assertEquals(2, appendEntriesReply.getLogLastIndex());
671             assertEquals(1, appendEntriesReply.getLogLastTerm());
672
673         }};
674     }
675
676     private static class LeaderTestKit extends JavaTestKit {
677
678         private LeaderTestKit(ActorSystem actorSystem) {
679             super(actorSystem);
680         }
681
682         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
683             // Wait for a specific log message to show up
684             final boolean result =
685             new JavaTestKit.EventFilter<Boolean>(logLevel
686             ) {
687                 @Override
688                 protected Boolean run() {
689                     return true;
690                 }
691             }.from(subject.path().toString())
692                 .message(logMessage)
693                 .occurrences(1).exec();
694
695             Assert.assertEquals(true, result);
696
697         }
698     }
699
700     class MockLeader extends Leader {
701
702         FollowerToSnapshot fts;
703
704         public MockLeader(RaftActorContext context){
705             super(context);
706         }
707
708         public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
709             FollowerLogInformation followerLogInformation =
710                 new FollowerLogInformationImpl(followerId,
711                     new AtomicLong(nextIndex),
712                     new AtomicLong(matchIndex));
713             followerToLog.put(followerId, followerLogInformation);
714         }
715
716         public FollowerToSnapshot getFollowerToSnapshot() {
717             return fts;
718         }
719
720         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
721             fts = new FollowerToSnapshot(bs);
722             mapFollowerToSnapshot.put(followerId, fts);
723
724         }
725     }
726 }