Merge "Bug 2003: CDS serialization improvements"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
29 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.io.ObjectOutputStream;
34 import java.util.HashMap;
35 import java.util.Map;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import static org.junit.Assert.assertEquals;
39 import static org.junit.Assert.assertNotNull;
40 import static org.junit.Assert.assertTrue;
41
42 public class LeaderTest extends AbstractRaftActorBehaviorTest {
43
44     private ActorRef leaderActor =
45         getSystem().actorOf(Props.create(DoNothingActor.class));
46     private ActorRef senderActor =
47         getSystem().actorOf(Props.create(DoNothingActor.class));
48
49     @Test
50     public void testHandleMessageForUnknownMessage() throws Exception {
51         new JavaTestKit(getSystem()) {{
52             Leader leader =
53                 new Leader(createActorContext());
54
55             // handle message should return the Leader state when it receives an
56             // unknown message
57             RaftState state = leader.handleMessage(senderActor, "foo");
58             Assert.assertEquals(RaftState.Leader, state);
59         }};
60     }
61
62
63     @Test
64     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
65         new JavaTestKit(getSystem()) {{
66
67             new Within(duration("1 seconds")) {
68                 protected void run() {
69
70                     ActorRef followerActor = getTestActor();
71
72                     MockRaftActorContext actorContext =
73                         (MockRaftActorContext) createActorContext();
74
75                     Map<String, String> peerAddresses = new HashMap();
76
77                     peerAddresses.put(followerActor.path().toString(),
78                         followerActor.path().toString());
79
80                     actorContext.setPeerAddresses(peerAddresses);
81
82                     Leader leader = new Leader(actorContext);
83                     leader.handleMessage(senderActor, new SendHeartBeat());
84
85                     final String out =
86                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
87                             // do not put code outside this method, will run afterwards
88                             protected String match(Object in) {
89                                 Object msg = fromSerializableMessage(in);
90                                 if (msg instanceof AppendEntries) {
91                                     if (((AppendEntries)msg).getTerm() == 0) {
92                                         return "match";
93                                     }
94                                     return null;
95                                 } else {
96                                     throw noMatch();
97                                 }
98                             }
99                         }.get(); // this extracts the received message
100
101                     assertEquals("match", out);
102
103                 }
104             };
105         }};
106     }
107
108     @Test
109     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
110         new JavaTestKit(getSystem()) {{
111
112             new Within(duration("1 seconds")) {
113                 protected void run() {
114
115                     ActorRef followerActor = getTestActor();
116
117                     MockRaftActorContext actorContext =
118                         (MockRaftActorContext) createActorContext();
119
120                     Map<String, String> peerAddresses = new HashMap();
121
122                     peerAddresses.put(followerActor.path().toString(),
123                         followerActor.path().toString());
124
125                     actorContext.setPeerAddresses(peerAddresses);
126
127                     Leader leader = new Leader(actorContext);
128                     RaftState raftState = leader
129                         .handleMessage(senderActor, new Replicate(null, null,
130                             new MockRaftActorContext.MockReplicatedLogEntry(1,
131                                 100,
132                                 new MockRaftActorContext.MockPayload("foo"))
133                         ));
134
135                     // State should not change
136                     assertEquals(RaftState.Leader, raftState);
137
138                     final String out =
139                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
140                             // do not put code outside this method, will run afterwards
141                             protected String match(Object in) {
142                                 Object msg = fromSerializableMessage(in);
143                                 if (msg instanceof AppendEntries) {
144                                     if (((AppendEntries)msg).getTerm() == 0) {
145                                         return "match";
146                                     }
147                                     return null;
148                                 } else {
149                                     throw noMatch();
150                                 }
151                             }
152                         }.get(); // this extracts the received message
153
154                     assertEquals("match", out);
155
156                 }
157
158
159             };
160         }};
161     }
162
163     @Test
164     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
165         new JavaTestKit(getSystem()) {{
166
167             new Within(duration("1 seconds")) {
168                 protected void run() {
169
170                     ActorRef raftActor = getTestActor();
171
172                     MockRaftActorContext actorContext =
173                         new MockRaftActorContext("test", getSystem(), raftActor);
174
175                     actorContext.getReplicatedLog().removeFrom(0);
176
177                     actorContext.setReplicatedLog(
178                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
179                             .build());
180
181                     Leader leader = new Leader(actorContext);
182                     RaftState raftState = leader
183                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
184
185                     // State should not change
186                     assertEquals(RaftState.Leader, raftState);
187
188                     assertEquals(1, actorContext.getCommitIndex());
189
190                     final String out =
191                         new ExpectMsg<String>(duration("1 seconds"),
192                             "match hint") {
193                             // do not put code outside this method, will run afterwards
194                             protected String match(Object in) {
195                                 if (in instanceof ApplyState) {
196                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
197                                         return "match";
198                                     }
199                                     return null;
200                                 } else {
201                                     throw noMatch();
202                                 }
203                             }
204                         }.get(); // this extracts the received message
205
206                     assertEquals("match", out);
207
208                 }
209             };
210         }};
211     }
212
213     @Test
214     public void testSendInstallSnapshot() {
215         new LeaderTestKit(getSystem()) {{
216
217             new Within(duration("1 seconds")) {
218                 protected void run() {
219                     ActorRef followerActor = getTestActor();
220
221                     Map<String, String> peerAddresses = new HashMap();
222                     peerAddresses.put(followerActor.path().toString(),
223                         followerActor.path().toString());
224
225
226                     MockRaftActorContext actorContext =
227                         (MockRaftActorContext) createActorContext(getRef());
228                     actorContext.setPeerAddresses(peerAddresses);
229
230
231                     Map<String, String> leadersSnapshot = new HashMap<>();
232                     leadersSnapshot.put("1", "A");
233                     leadersSnapshot.put("2", "B");
234                     leadersSnapshot.put("3", "C");
235
236                     //clears leaders log
237                     actorContext.getReplicatedLog().removeFrom(0);
238
239                     final int followersLastIndex = 2;
240                     final int snapshotIndex = 3;
241                     final int newEntryIndex = 4;
242                     final int snapshotTerm = 1;
243                     final int currentTerm = 2;
244
245                     // set the snapshot variables in replicatedlog
246                     actorContext.getReplicatedLog().setSnapshot(
247                         toByteString(leadersSnapshot));
248                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
249                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
250
251                     MockLeader leader = new MockLeader(actorContext);
252                     // set the follower info in leader
253                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
254
255                     // new entry
256                     ReplicatedLogImplEntry entry =
257                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
258                             new MockRaftActorContext.MockPayload("D"));
259
260                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
261                     RaftState raftState = leader.handleMessage(
262                         senderActor, new Replicate(null, "state-id", entry));
263
264                     assertEquals(RaftState.Leader, raftState);
265
266                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
267                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
268                         @Override
269                         protected Boolean match(Object o) throws Exception {
270                             if (o instanceof SendInstallSnapshot) {
271                                 return true;
272                             }
273                             return false;
274                         }
275                     }.get();
276
277                     boolean sendInstallSnapshotReceived = false;
278                     for (Boolean b: matches) {
279                         sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
280                     }
281
282                     assertTrue(sendInstallSnapshotReceived);
283
284                 }
285             };
286         }};
287     }
288
289     @Test
290     public void testInstallSnapshot() {
291         new LeaderTestKit(getSystem()) {{
292
293             new Within(duration("1 seconds")) {
294                 protected void run() {
295                     ActorRef followerActor = getTestActor();
296
297                     Map<String, String> peerAddresses = new HashMap();
298                     peerAddresses.put(followerActor.path().toString(),
299                         followerActor.path().toString());
300
301                     MockRaftActorContext actorContext =
302                         (MockRaftActorContext) createActorContext();
303                     actorContext.setPeerAddresses(peerAddresses);
304
305
306                     Map<String, String> leadersSnapshot = new HashMap<>();
307                     leadersSnapshot.put("1", "A");
308                     leadersSnapshot.put("2", "B");
309                     leadersSnapshot.put("3", "C");
310
311                     //clears leaders log
312                     actorContext.getReplicatedLog().removeFrom(0);
313
314                     final int followersLastIndex = 2;
315                     final int snapshotIndex = 3;
316                     final int newEntryIndex = 4;
317                     final int snapshotTerm = 1;
318                     final int currentTerm = 2;
319
320                     // set the snapshot variables in replicatedlog
321                     actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
322                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
323                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
324
325                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
326
327                     MockLeader leader = new MockLeader(actorContext);
328                     // set the follower info in leader
329                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
330
331                     // new entry
332                     ReplicatedLogImplEntry entry =
333                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
334                             new MockRaftActorContext.MockPayload("D"));
335
336                     RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
337
338                     assertEquals(RaftState.Leader, raftState);
339
340                     // check if installsnapshot gets called with the correct values.
341                     final String out =
342                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
343                             // do not put code outside this method, will run afterwards
344                             protected String match(Object in) {
345                                 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
346                                     InstallSnapshot is = (InstallSnapshot)
347                                         SerializationUtils.fromSerializable(in);
348                                     if (is.getData() == null) {
349                                         return "InstallSnapshot data is null";
350                                     }
351                                     if (is.getLastIncludedIndex() != snapshotIndex) {
352                                         return is.getLastIncludedIndex() + "!=" + snapshotIndex;
353                                     }
354                                     if (is.getLastIncludedTerm() != snapshotTerm) {
355                                         return is.getLastIncludedTerm() + "!=" + snapshotTerm;
356                                     }
357                                     if (is.getTerm() == currentTerm) {
358                                         return is.getTerm() + "!=" + currentTerm;
359                                     }
360
361                                     return "match";
362
363                                } else {
364                                     return "message mismatch:" + in.getClass();
365                                 }
366                             }
367                         }.get(); // this extracts the received message
368
369                     assertEquals("match", out);
370                 }
371             };
372         }};
373     }
374
375     @Test
376     public void testHandleInstallSnapshotReplyLastChunk() {
377         new LeaderTestKit(getSystem()) {{
378             new Within(duration("1 seconds")) {
379                 protected void run() {
380                     ActorRef followerActor = getTestActor();
381
382                     Map<String, String> peerAddresses = new HashMap();
383                     peerAddresses.put(followerActor.path().toString(),
384                         followerActor.path().toString());
385
386                     MockRaftActorContext actorContext =
387                         (MockRaftActorContext) createActorContext();
388                     actorContext.setPeerAddresses(peerAddresses);
389
390                     final int followersLastIndex = 2;
391                     final int snapshotIndex = 3;
392                     final int newEntryIndex = 4;
393                     final int snapshotTerm = 1;
394                     final int currentTerm = 2;
395
396                     MockLeader leader = new MockLeader(actorContext);
397                     // set the follower info in leader
398                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
399
400                     Map<String, String> leadersSnapshot = new HashMap<>();
401                     leadersSnapshot.put("1", "A");
402                     leadersSnapshot.put("2", "B");
403                     leadersSnapshot.put("3", "C");
404
405                     // set the snapshot variables in replicatedlog
406                     actorContext.getReplicatedLog().setSnapshot(
407                         toByteString(leadersSnapshot));
408                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
409                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
410                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
411
412                     ByteString bs = toByteString(leadersSnapshot);
413                     leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
414                     while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
415                         leader.getFollowerToSnapshot().getNextChunk();
416                         leader.getFollowerToSnapshot().incrementChunkIndex();
417                     }
418
419                     //clears leaders log
420                     actorContext.getReplicatedLog().removeFrom(0);
421
422                     RaftState raftState = leader.handleMessage(senderActor,
423                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
424                             leader.getFollowerToSnapshot().getChunkIndex(), true));
425
426                     assertEquals(RaftState.Leader, raftState);
427
428                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
429                     assertEquals(leader.followerToLog.size(), 1);
430                     assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
431                     FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
432                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
433                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
434                     assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
435                 }
436             };
437         }};
438     }
439
440     @Test
441     public void testFollowerToSnapshotLogic() {
442
443         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
444
445         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
446             @Override
447             public int getSnapshotChunkSize() {
448                 return 50;
449             }
450         });
451
452         MockLeader leader = new MockLeader(actorContext);
453
454         Map<String, String> leadersSnapshot = new HashMap<>();
455         leadersSnapshot.put("1", "A");
456         leadersSnapshot.put("2", "B");
457         leadersSnapshot.put("3", "C");
458
459         ByteString bs = toByteString(leadersSnapshot);
460         byte[] barray = bs.toByteArray();
461
462         leader.createFollowerToSnapshot("followerId", bs);
463         assertEquals(bs.size(), barray.length);
464
465         int chunkIndex=0;
466         for (int i=0; i < barray.length; i = i + 50) {
467             int j = i + 50;
468             chunkIndex++;
469
470             if (i + 50 > barray.length) {
471                 j = barray.length;
472             }
473
474             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
475             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
476             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
477
478             leader.getFollowerToSnapshot().markSendStatus(true);
479             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
480                 leader.getFollowerToSnapshot().incrementChunkIndex();
481             }
482         }
483
484         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
485     }
486
487
488     @Override protected RaftActorBehavior createBehavior(
489         RaftActorContext actorContext) {
490         return new Leader(actorContext);
491     }
492
493     @Override protected RaftActorContext createActorContext() {
494         return createActorContext(leaderActor);
495     }
496
497     protected RaftActorContext createActorContext(ActorRef actorRef) {
498         return new MockRaftActorContext("test", getSystem(), actorRef);
499     }
500
501     private ByteString toByteString(Map<String, String> state) {
502         ByteArrayOutputStream b = null;
503         ObjectOutputStream o = null;
504         try {
505             try {
506                 b = new ByteArrayOutputStream();
507                 o = new ObjectOutputStream(b);
508                 o.writeObject(state);
509                 byte[] snapshotBytes = b.toByteArray();
510                 return ByteString.copyFrom(snapshotBytes);
511             } finally {
512                 if (o != null) {
513                     o.flush();
514                     o.close();
515                 }
516                 if (b != null) {
517                     b.close();
518                 }
519             }
520         } catch (IOException e) {
521             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
522         }
523         return null;
524     }
525
526     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
527         private static AbstractRaftActorBehavior behavior;
528
529         public ForwardMessageToBehaviorActor(){
530
531         }
532
533         @Override public void onReceive(Object message) throws Exception {
534             super.onReceive(message);
535             behavior.handleMessage(sender(), message);
536         }
537
538         public static void setBehavior(AbstractRaftActorBehavior behavior){
539             ForwardMessageToBehaviorActor.behavior = behavior;
540         }
541     }
542
543     @Test
544     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
545         new JavaTestKit(getSystem()) {{
546
547             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
548
549             MockRaftActorContext leaderActorContext =
550                 new MockRaftActorContext("leader", getSystem(), leaderActor);
551
552             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
553
554             MockRaftActorContext followerActorContext =
555                 new MockRaftActorContext("follower", getSystem(), followerActor);
556
557             Follower follower = new Follower(followerActorContext);
558
559             ForwardMessageToBehaviorActor.setBehavior(follower);
560
561             Map<String, String> peerAddresses = new HashMap();
562             peerAddresses.put(followerActor.path().toString(),
563                 followerActor.path().toString());
564
565             leaderActorContext.setPeerAddresses(peerAddresses);
566
567             leaderActorContext.getReplicatedLog().removeFrom(0);
568
569             //create 3 entries
570             leaderActorContext.setReplicatedLog(
571                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
572
573             leaderActorContext.setCommitIndex(1);
574
575             followerActorContext.getReplicatedLog().removeFrom(0);
576
577             // follower too has the exact same log entries and has the same commit index
578             followerActorContext.setReplicatedLog(
579                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
580
581             followerActorContext.setCommitIndex(1);
582
583             Leader leader = new Leader(leaderActorContext);
584
585             leader.handleMessage(leaderActor, new SendHeartBeat());
586
587             AppendEntriesMessages.AppendEntries appendEntries =
588                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
589                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
590
591             assertNotNull(appendEntries);
592
593             assertEquals(1, appendEntries.getLeaderCommit());
594             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
595             assertEquals(0, appendEntries.getPrevLogIndex());
596
597             AppendEntriesReply appendEntriesReply =
598                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
599                     leaderActor, AppendEntriesReply.class);
600
601             assertNotNull(appendEntriesReply);
602
603             // follower returns its next index
604             assertEquals(2, appendEntriesReply.getLogLastIndex());
605             assertEquals(1, appendEntriesReply.getLogLastTerm());
606
607         }};
608     }
609
610
611     @Test
612     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
613         new JavaTestKit(getSystem()) {{
614
615             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
616
617             MockRaftActorContext leaderActorContext =
618                 new MockRaftActorContext("leader", getSystem(), leaderActor);
619
620             ActorRef followerActor = getSystem().actorOf(
621                 Props.create(ForwardMessageToBehaviorActor.class));
622
623             MockRaftActorContext followerActorContext =
624                 new MockRaftActorContext("follower", getSystem(), followerActor);
625
626             Follower follower = new Follower(followerActorContext);
627
628             ForwardMessageToBehaviorActor.setBehavior(follower);
629
630             Map<String, String> peerAddresses = new HashMap();
631             peerAddresses.put(followerActor.path().toString(),
632                 followerActor.path().toString());
633
634             leaderActorContext.setPeerAddresses(peerAddresses);
635
636             leaderActorContext.getReplicatedLog().removeFrom(0);
637
638             leaderActorContext.setReplicatedLog(
639                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
640
641             leaderActorContext.setCommitIndex(1);
642
643             followerActorContext.getReplicatedLog().removeFrom(0);
644
645             followerActorContext.setReplicatedLog(
646                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
647
648             // follower has the same log entries but its commit index > leaders commit index
649             followerActorContext.setCommitIndex(2);
650
651             Leader leader = new Leader(leaderActorContext);
652
653             leader.handleMessage(leaderActor, new SendHeartBeat());
654
655             AppendEntriesMessages.AppendEntries appendEntries =
656                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
657                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
658
659             assertNotNull(appendEntries);
660
661             assertEquals(1, appendEntries.getLeaderCommit());
662             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
663             assertEquals(0, appendEntries.getPrevLogIndex());
664
665             AppendEntriesReply appendEntriesReply =
666                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
667                     leaderActor, AppendEntriesReply.class);
668
669             assertNotNull(appendEntriesReply);
670
671             assertEquals(2, appendEntriesReply.getLogLastIndex());
672             assertEquals(1, appendEntriesReply.getLogLastTerm());
673
674         }};
675     }
676
677     private static class LeaderTestKit extends JavaTestKit {
678
679         private LeaderTestKit(ActorSystem actorSystem) {
680             super(actorSystem);
681         }
682
683         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
684             // Wait for a specific log message to show up
685             final boolean result =
686             new JavaTestKit.EventFilter<Boolean>(logLevel
687             ) {
688                 @Override
689                 protected Boolean run() {
690                     return true;
691                 }
692             }.from(subject.path().toString())
693                 .message(logMessage)
694                 .occurrences(1).exec();
695
696             Assert.assertEquals(true, result);
697
698         }
699     }
700
701     class MockLeader extends Leader {
702
703         FollowerToSnapshot fts;
704
705         public MockLeader(RaftActorContext context){
706             super(context);
707         }
708
709         public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
710             FollowerLogInformation followerLogInformation =
711                 new FollowerLogInformationImpl(followerId,
712                     new AtomicLong(nextIndex),
713                     new AtomicLong(matchIndex));
714             followerToLog.put(followerId, followerLogInformation);
715         }
716
717         public FollowerToSnapshot getFollowerToSnapshot() {
718             return fts;
719         }
720
721         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
722             fts = new FollowerToSnapshot(bs);
723             mapFollowerToSnapshot.put(followerId, fts);
724
725         }
726     }
727 }