BUG 2509 : Removing all journal entries from a Followers in-memory journal causes...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.Props;
8 import akka.testkit.JavaTestKit;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
35
36     private final ActorRef followerActor = getSystem().actorOf(Props.create(
37         DoNothingActor.class));
38
39
40     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
41         return new Follower(actorContext);
42     }
43
44     @Override protected  RaftActorContext createActorContext() {
45         return createActorContext(followerActor);
46     }
47
48     protected  RaftActorContext createActorContext(ActorRef actorRef){
49         return new MockRaftActorContext("test", getSystem(), actorRef);
50     }
51
52     @Test
53     public void testThatAnElectionTimeoutIsTriggered(){
54         new JavaTestKit(getSystem()) {{
55
56             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
57                 protected void run() {
58
59                     Follower follower = new Follower(createActorContext(getTestActor()));
60
61                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
62                         // do not put code outside this method, will run afterwards
63                         protected Boolean match(Object in) {
64                             if (in instanceof ElectionTimeout) {
65                                 return true;
66                             } else {
67                                 throw noMatch();
68                             }
69                         }
70                     }.get();
71
72                     assertEquals(true, out);
73                 }
74             };
75         }};
76     }
77
78     @Test
79     public void testHandleElectionTimeout(){
80         RaftActorContext raftActorContext = createActorContext();
81         Follower follower =
82             new Follower(raftActorContext);
83
84         RaftActorBehavior raftBehavior =
85             follower.handleMessage(followerActor, new ElectionTimeout());
86
87         assertTrue(raftBehavior instanceof Candidate);
88     }
89
90     @Test
91     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
92         new JavaTestKit(getSystem()) {{
93
94             new Within(duration("1 seconds")) {
95                 protected void run() {
96
97                     RaftActorContext context = createActorContext(getTestActor());
98
99                     context.getTermInformation().update(1000, null);
100
101                     RaftActorBehavior follower = createBehavior(context);
102
103                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
104
105                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
106                         // do not put code outside this method, will run afterwards
107                         protected Boolean match(Object in) {
108                             if (in instanceof RequestVoteReply) {
109                                 RequestVoteReply reply = (RequestVoteReply) in;
110                                 return reply.isVoteGranted();
111                             } else {
112                                 throw noMatch();
113                             }
114                         }
115                     }.get();
116
117                     assertEquals(true, out);
118                 }
119             };
120         }};
121     }
122
123     @Test
124     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
125         new JavaTestKit(getSystem()) {{
126
127             new Within(duration("1 seconds")) {
128                 protected void run() {
129
130                     RaftActorContext context = createActorContext(getTestActor());
131
132                     context.getTermInformation().update(1000, "test");
133
134                     RaftActorBehavior follower = createBehavior(context);
135
136                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
137
138                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
139                         // do not put code outside this method, will run afterwards
140                         protected Boolean match(Object in) {
141                             if (in instanceof RequestVoteReply) {
142                                 RequestVoteReply reply = (RequestVoteReply) in;
143                                 return reply.isVoteGranted();
144                             } else {
145                                 throw noMatch();
146                             }
147                         }
148                     }.get();
149
150                     assertEquals(false, out);
151                 }
152             };
153         }};
154     }
155
156     /**
157      * This test verifies that when an AppendEntries RPC is received by a RaftActor
158      * with a commitIndex that is greater than what has been applied to the
159      * state machine of the RaftActor, the RaftActor applies the state and
160      * sets it current applied state to the commitIndex of the sender.
161      *
162      * @throws Exception
163      */
164     @Test
165     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
166         new JavaTestKit(getSystem()) {{
167
168             RaftActorContext context =
169                 createActorContext();
170
171             context.setLastApplied(100);
172             setLastLogEntry((MockRaftActorContext) context, 1, 100,
173                 new MockRaftActorContext.MockPayload(""));
174             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
175
176             List<ReplicatedLogEntry> entries =
177                 Arrays.asList(
178                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
179                                 new MockRaftActorContext.MockPayload("foo"))
180                 );
181
182             // The new commitIndex is 101
183             AppendEntries appendEntries =
184                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
185
186             RaftActorBehavior raftBehavior =
187                 createBehavior(context).handleMessage(getRef(), appendEntries);
188
189             assertEquals(101L, context.getLastApplied());
190
191         }};
192     }
193
194     /**
195      * This test verifies that when an AppendEntries is received a specific prevLogTerm
196      * which does not match the term that is in RaftActors log entry at prevLogIndex
197      * then the RaftActor does not change it's state and it returns a failure.
198      *
199      * @throws Exception
200      */
201     @Test
202     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
203         throws Exception {
204         new JavaTestKit(getSystem()) {{
205
206             MockRaftActorContext context = (MockRaftActorContext)
207                 createActorContext();
208
209             // First set the receivers term to lower number
210             context.getTermInformation().update(95, "test");
211
212             // Set the last log entry term for the receiver to be greater than
213             // what we will be sending as the prevLogTerm in AppendEntries
214             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
215                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
216
217             // AppendEntries is now sent with a bigger term
218             // this will set the receivers term to be the same as the sender's term
219             AppendEntries appendEntries =
220                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
221
222             RaftActorBehavior behavior = createBehavior(context);
223
224             // Send an unknown message so that the state of the RaftActor remains unchanged
225             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
226
227             RaftActorBehavior raftBehavior =
228                 behavior.handleMessage(getRef(), appendEntries);
229
230             assertEquals(expected, raftBehavior);
231
232             // Also expect an AppendEntriesReply to be sent where success is false
233             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
234                 "AppendEntriesReply") {
235                 // do not put code outside this method, will run afterwards
236                 protected Boolean match(Object in) {
237                     if (in instanceof AppendEntriesReply) {
238                         AppendEntriesReply reply = (AppendEntriesReply) in;
239                         return reply.isSuccess();
240                     } else {
241                         throw noMatch();
242                     }
243                 }
244             }.get();
245
246             assertEquals(false, out);
247
248
249         }};
250     }
251
252
253
254     /**
255      * This test verifies that when a new AppendEntries message is received with
256      * new entries and the logs of the sender and receiver match that the new
257      * entries get added to the log and the log is incremented by the number of
258      * entries received in appendEntries
259      *
260      * @throws Exception
261      */
262     @Test
263     public void testHandleAppendEntriesAddNewEntries() throws Exception {
264         new JavaTestKit(getSystem()) {{
265
266             MockRaftActorContext context = (MockRaftActorContext)
267                 createActorContext();
268
269             // First set the receivers term to lower number
270             context.getTermInformation().update(1, "test");
271
272             // Prepare the receivers log
273             MockRaftActorContext.SimpleReplicatedLog log =
274                 new MockRaftActorContext.SimpleReplicatedLog();
275             log.append(
276                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
277             log.append(
278                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
279             log.append(
280                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
281
282             context.setReplicatedLog(log);
283
284             // Prepare the entries to be sent with AppendEntries
285             List<ReplicatedLogEntry> entries = new ArrayList<>();
286             entries.add(
287                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
288             entries.add(
289                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
290
291             // Send appendEntries with the same term as was set on the receiver
292             // before the new behavior was created (1 in this case)
293             // This will not work for a Candidate because as soon as a Candidate
294             // is created it increments the term
295             AppendEntries appendEntries =
296                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
297
298             RaftActorBehavior behavior = createBehavior(context);
299
300             // Send an unknown message so that the state of the RaftActor remains unchanged
301             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
302
303             RaftActorBehavior raftBehavior =
304                 behavior.handleMessage(getRef(), appendEntries);
305
306             assertEquals(expected, raftBehavior);
307             assertEquals(5, log.last().getIndex() + 1);
308             assertNotNull(log.get(3));
309             assertNotNull(log.get(4));
310
311             // Also expect an AppendEntriesReply to be sent where success is false
312             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
313                 "AppendEntriesReply") {
314                 // do not put code outside this method, will run afterwards
315                 protected Boolean match(Object in) {
316                     if (in instanceof AppendEntriesReply) {
317                         AppendEntriesReply reply = (AppendEntriesReply) in;
318                         return reply.isSuccess();
319                     } else {
320                         throw noMatch();
321                     }
322                 }
323             }.get();
324
325             assertEquals(true, out);
326
327
328         }};
329     }
330
331
332
333     /**
334      * This test verifies that when a new AppendEntries message is received with
335      * new entries and the logs of the sender and receiver are out-of-sync that
336      * the log is first corrected by removing the out of sync entries from the
337      * log and then adding in the new entries sent with the AppendEntries message
338      *
339      * @throws Exception
340      */
341     @Test
342     public void testHandleAppendEntriesCorrectReceiverLogEntries()
343         throws Exception {
344         new JavaTestKit(getSystem()) {{
345
346             MockRaftActorContext context = (MockRaftActorContext)
347                 createActorContext();
348
349             // First set the receivers term to lower number
350             context.getTermInformation().update(2, "test");
351
352             // Prepare the receivers log
353             MockRaftActorContext.SimpleReplicatedLog log =
354                 new MockRaftActorContext.SimpleReplicatedLog();
355             log.append(
356                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
357             log.append(
358                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
359             log.append(
360                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
361
362             context.setReplicatedLog(log);
363
364             // Prepare the entries to be sent with AppendEntries
365             List<ReplicatedLogEntry> entries = new ArrayList<>();
366             entries.add(
367                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
368             entries.add(
369                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
370
371             // Send appendEntries with the same term as was set on the receiver
372             // before the new behavior was created (1 in this case)
373             // This will not work for a Candidate because as soon as a Candidate
374             // is created it increments the term
375             AppendEntries appendEntries =
376                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
377
378             RaftActorBehavior behavior = createBehavior(context);
379
380             // Send an unknown message so that the state of the RaftActor remains unchanged
381             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
382
383             RaftActorBehavior raftBehavior =
384                 behavior.handleMessage(getRef(), appendEntries);
385
386             assertEquals(expected, raftBehavior);
387
388             // The entry at index 2 will be found out-of-sync with the leader
389             // and will be removed
390             // Then the two new entries will be added to the log
391             // Thus making the log to have 4 entries
392             assertEquals(4, log.last().getIndex() + 1);
393             assertNotNull(log.get(2));
394
395             assertEquals("one", log.get(1).getData().toString());
396
397             // Check that the entry at index 2 has the new data
398             assertEquals("two-1", log.get(2).getData().toString());
399
400             assertEquals("three", log.get(3).getData().toString());
401
402             assertNotNull(log.get(3));
403
404             // Also expect an AppendEntriesReply to be sent where success is false
405             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
406                 "AppendEntriesReply") {
407                 // do not put code outside this method, will run afterwards
408                 protected Boolean match(Object in) {
409                     if (in instanceof AppendEntriesReply) {
410                         AppendEntriesReply reply = (AppendEntriesReply) in;
411                         return reply.isSuccess();
412                     } else {
413                         throw noMatch();
414                     }
415                 }
416             }.get();
417
418             assertEquals(true, out);
419
420
421         }};
422     }
423
424     @Test
425     public void testHandleAppendEntriesPreviousLogEntryMissing(){
426         new JavaTestKit(getSystem()) {{
427
428             MockRaftActorContext context = (MockRaftActorContext)
429                     createActorContext();
430
431             // Prepare the receivers log
432             MockRaftActorContext.SimpleReplicatedLog log =
433                     new MockRaftActorContext.SimpleReplicatedLog();
434             log.append(
435                     new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
436             log.append(
437                     new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
438             log.append(
439                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
440
441             context.setReplicatedLog(log);
442
443             // Prepare the entries to be sent with AppendEntries
444             List<ReplicatedLogEntry> entries = new ArrayList<>();
445             entries.add(
446                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
447
448             AppendEntries appendEntries =
449                     new AppendEntries(1, "leader-1", 3, 1, entries, 4);
450
451             RaftActorBehavior behavior = createBehavior(context);
452
453             // Send an unknown message so that the state of the RaftActor remains unchanged
454             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
455
456             RaftActorBehavior raftBehavior =
457                     behavior.handleMessage(getRef(), appendEntries);
458
459             assertEquals(expected, raftBehavior);
460
461             // Also expect an AppendEntriesReply to be sent where success is false
462             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
463                     "AppendEntriesReply") {
464                 // do not put code outside this method, will run afterwards
465                 protected Boolean match(Object in) {
466                     if (in instanceof AppendEntriesReply) {
467                         AppendEntriesReply reply = (AppendEntriesReply) in;
468                         return reply.isSuccess();
469                     } else {
470                         throw noMatch();
471                     }
472                 }
473             }.get();
474
475             assertEquals(false, out);
476
477         }};
478
479     }
480
481     @Test
482     public void testHandleAppendAfterInstallingSnapshot(){
483         new JavaTestKit(getSystem()) {{
484
485             MockRaftActorContext context = (MockRaftActorContext)
486                     createActorContext();
487
488
489             // Prepare the receivers log
490             MockRaftActorContext.SimpleReplicatedLog log =
491                     new MockRaftActorContext.SimpleReplicatedLog();
492
493             // Set up a log as if it has been snapshotted
494             log.setSnapshotIndex(3);
495             log.setSnapshotTerm(1);
496
497             context.setReplicatedLog(log);
498
499             // Prepare the entries to be sent with AppendEntries
500             List<ReplicatedLogEntry> entries = new ArrayList<>();
501             entries.add(
502                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
503
504             AppendEntries appendEntries =
505                     new AppendEntries(1, "leader-1", 3, 1, entries, 4);
506
507             RaftActorBehavior behavior = createBehavior(context);
508
509             // Send an unknown message so that the state of the RaftActor remains unchanged
510             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
511
512             RaftActorBehavior raftBehavior =
513                     behavior.handleMessage(getRef(), appendEntries);
514
515             assertEquals(expected, raftBehavior);
516
517             // Also expect an AppendEntriesReply to be sent where success is false
518             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
519                     "AppendEntriesReply") {
520                 // do not put code outside this method, will run afterwards
521                 protected Boolean match(Object in) {
522                     if (in instanceof AppendEntriesReply) {
523                         AppendEntriesReply reply = (AppendEntriesReply) in;
524                         return reply.isSuccess();
525                     } else {
526                         throw noMatch();
527                     }
528                 }
529             }.get();
530
531             assertEquals(true, out);
532
533         }};
534
535     }
536
537
538     /**
539      * This test verifies that when InstallSnapshot is received by
540      * the follower its applied correctly.
541      *
542      * @throws Exception
543      */
544     @Test
545     public void testHandleInstallSnapshot() throws Exception {
546         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
547
548             ActorRef leaderActor = getSystem().actorOf(Props.create(
549                 MessageCollectorActor.class));
550
551             MockRaftActorContext context = (MockRaftActorContext)
552                 createActorContext(getRef());
553
554             Follower follower = (Follower)createBehavior(context);
555
556             HashMap<String, String> followerSnapshot = new HashMap<>();
557             followerSnapshot.put("1", "A");
558             followerSnapshot.put("2", "B");
559             followerSnapshot.put("3", "C");
560
561             ByteString bsSnapshot  = toByteString(followerSnapshot);
562             ByteString chunkData = ByteString.EMPTY;
563             int offset = 0;
564             int snapshotLength = bsSnapshot.size();
565             int i = 1;
566             int chunkIndex = 1;
567
568             do {
569                 chunkData = getNextChunk(bsSnapshot, offset);
570                 final InstallSnapshot installSnapshot =
571                     new InstallSnapshot(1, "leader-1", i, 1,
572                         chunkData, chunkIndex, 3);
573                 follower.handleMessage(leaderActor, installSnapshot);
574                 offset = offset + 50;
575                 i++;
576                 chunkIndex++;
577             } while ((offset+50) < snapshotLength);
578
579             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
580             follower.handleMessage(leaderActor, installSnapshot3);
581
582             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
583                 @Override
584                 protected String match(Object o) throws Exception {
585                     if (o instanceof ApplySnapshot) {
586                         ApplySnapshot as = (ApplySnapshot)o;
587                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
588                             return "applySnapshot-lastIndex-mismatch";
589                         }
590                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
591                             return "applySnapshot-lastAppliedTerm-mismatch";
592                         }
593                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
594                             return "applySnapshot-lastAppliedIndex-mismatch";
595                         }
596                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
597                             return "applySnapshot-lastTerm-mismatch";
598                         }
599                         return "applySnapshot";
600                     }
601
602                     return "ignoreCase";
603                 }
604             }.get();
605
606             // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
607             assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
608
609             String applySnapshotMatch = "";
610             for (String reply: matches) {
611                 if (reply.startsWith("applySnapshot")) {
612                     applySnapshotMatch = reply;
613                 }
614             }
615
616             assertEquals("applySnapshot", applySnapshotMatch);
617
618             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
619
620             assertNotNull(messages);
621             assertTrue(messages instanceof List);
622             List<Object> listMessages = (List<Object>) messages;
623
624             int installSnapshotReplyReceivedCount = 0;
625             for (Object message: listMessages) {
626                 if (message instanceof InstallSnapshotReply) {
627                     ++installSnapshotReplyReceivedCount;
628                 }
629             }
630
631             assertEquals(3, installSnapshotReplyReceivedCount);
632
633         }};
634     }
635
636     @Test
637     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
638         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
639             {
640
641                 ActorRef leaderActor = getSystem().actorOf(Props.create(
642                         MessageCollectorActor.class));
643
644                 MockRaftActorContext context = (MockRaftActorContext)
645                         createActorContext(getRef());
646
647                 Follower follower = (Follower) createBehavior(context);
648
649                 HashMap<String, String> followerSnapshot = new HashMap<>();
650                 followerSnapshot.put("1", "A");
651                 followerSnapshot.put("2", "B");
652                 followerSnapshot.put("3", "C");
653
654                 ByteString bsSnapshot = toByteString(followerSnapshot);
655
656                 final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
657                 follower.handleMessage(leaderActor, installSnapshot);
658
659                 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
660
661                 assertNotNull(messages);
662                 assertTrue(messages instanceof List);
663                 List<Object> listMessages = (List<Object>) messages;
664
665                 int installSnapshotReplyReceivedCount = 0;
666                 for (Object message: listMessages) {
667                     if (message instanceof InstallSnapshotReply) {
668                         ++installSnapshotReplyReceivedCount;
669                     }
670                 }
671
672                 assertEquals(1, installSnapshotReplyReceivedCount);
673                 InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
674                 assertEquals(false, reply.isSuccess());
675                 assertEquals(-1, reply.getChunkIndex());
676                 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
677
678
679             }};
680     }
681
682     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
683         return MessageCollectorActor.getAllMessages(actor);
684     }
685
686     public ByteString getNextChunk (ByteString bs, int offset){
687         int snapshotLength = bs.size();
688         int start = offset;
689         int size = 50;
690         if (50 > snapshotLength) {
691             size = snapshotLength;
692         } else {
693             if ((start + 50) > snapshotLength) {
694                 size = snapshotLength - start;
695             }
696         }
697         return bs.substring(start, start + size);
698     }
699
700     private ByteString toByteString(Map<String, String> state) {
701         ByteArrayOutputStream b = null;
702         ObjectOutputStream o = null;
703         try {
704             try {
705                 b = new ByteArrayOutputStream();
706                 o = new ObjectOutputStream(b);
707                 o.writeObject(state);
708                 byte[] snapshotBytes = b.toByteArray();
709                 return ByteString.copyFrom(snapshotBytes);
710             } finally {
711                 if (o != null) {
712                     o.flush();
713                     o.close();
714                 }
715                 if (b != null) {
716                     b.close();
717                 }
718             }
719         } catch (IOException e) {
720             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
721         }
722         return null;
723     }
724 }