Merge "Refactor LeaderTest"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.Props;
8 import akka.testkit.JavaTestKit;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
35
36     private final ActorRef followerActor = getSystem().actorOf(Props.create(
37         DoNothingActor.class));
38
39
40     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
41         return new Follower(actorContext);
42     }
43
44     @Override
45     protected  MockRaftActorContext createActorContext() {
46         return createActorContext(followerActor);
47     }
48
49     @Override
50     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
51         return new MockRaftActorContext("test", getSystem(), actorRef);
52     }
53
54     @Test
55     public void testThatAnElectionTimeoutIsTriggered(){
56         new JavaTestKit(getSystem()) {{
57
58             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
59                 @Override
60                 protected void run() {
61
62                     Follower follower = new Follower(createActorContext(getTestActor()));
63
64                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
65                         // do not put code outside this method, will run afterwards
66                         @Override
67                         protected Boolean match(Object in) {
68                             if (in instanceof ElectionTimeout) {
69                                 return true;
70                             } else {
71                                 throw noMatch();
72                             }
73                         }
74                     }.get();
75
76                     assertEquals(true, out);
77                 }
78             };
79         }};
80     }
81
82     @Test
83     public void testHandleElectionTimeout(){
84         RaftActorContext raftActorContext = createActorContext();
85         Follower follower =
86             new Follower(raftActorContext);
87
88         RaftActorBehavior raftBehavior =
89             follower.handleMessage(followerActor, new ElectionTimeout());
90
91         assertTrue(raftBehavior instanceof Candidate);
92     }
93
94     @Test
95     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96         new JavaTestKit(getSystem()) {{
97
98             new Within(duration("1 seconds")) {
99                 @Override
100                 protected void run() {
101
102                     RaftActorContext context = createActorContext(getTestActor());
103
104                     context.getTermInformation().update(1000, null);
105
106                     RaftActorBehavior follower = createBehavior(context);
107
108                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
109
110                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
111                         // do not put code outside this method, will run afterwards
112                         @Override
113                         protected Boolean match(Object in) {
114                             if (in instanceof RequestVoteReply) {
115                                 RequestVoteReply reply = (RequestVoteReply) in;
116                                 return reply.isVoteGranted();
117                             } else {
118                                 throw noMatch();
119                             }
120                         }
121                     }.get();
122
123                     assertEquals(true, out);
124                 }
125             };
126         }};
127     }
128
129     @Test
130     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
131         new JavaTestKit(getSystem()) {{
132
133             new Within(duration("1 seconds")) {
134                 @Override
135                 protected void run() {
136
137                     RaftActorContext context = createActorContext(getTestActor());
138
139                     context.getTermInformation().update(1000, "test");
140
141                     RaftActorBehavior follower = createBehavior(context);
142
143                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
144
145                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
146                         // do not put code outside this method, will run afterwards
147                         @Override
148                         protected Boolean match(Object in) {
149                             if (in instanceof RequestVoteReply) {
150                                 RequestVoteReply reply = (RequestVoteReply) in;
151                                 return reply.isVoteGranted();
152                             } else {
153                                 throw noMatch();
154                             }
155                         }
156                     }.get();
157
158                     assertEquals(false, out);
159                 }
160             };
161         }};
162     }
163
164     /**
165      * This test verifies that when an AppendEntries RPC is received by a RaftActor
166      * with a commitIndex that is greater than what has been applied to the
167      * state machine of the RaftActor, the RaftActor applies the state and
168      * sets it current applied state to the commitIndex of the sender.
169      *
170      * @throws Exception
171      */
172     @Test
173     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
174         new JavaTestKit(getSystem()) {{
175
176             RaftActorContext context =
177                 createActorContext();
178
179             context.setLastApplied(100);
180             setLastLogEntry((MockRaftActorContext) context, 1, 100,
181                 new MockRaftActorContext.MockPayload(""));
182             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
183
184             List<ReplicatedLogEntry> entries =
185                 Arrays.asList(
186                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
187                                 new MockRaftActorContext.MockPayload("foo"))
188                 );
189
190             // The new commitIndex is 101
191             AppendEntries appendEntries =
192                 new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
193
194             RaftActorBehavior raftBehavior =
195                 createBehavior(context).handleMessage(getRef(), appendEntries);
196
197             assertEquals(101L, context.getLastApplied());
198
199         }};
200     }
201
202     /**
203      * This test verifies that when an AppendEntries is received a specific prevLogTerm
204      * which does not match the term that is in RaftActors log entry at prevLogIndex
205      * then the RaftActor does not change it's state and it returns a failure.
206      *
207      * @throws Exception
208      */
209     @Test
210     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
211         throws Exception {
212         new JavaTestKit(getSystem()) {{
213
214             MockRaftActorContext context = createActorContext();
215
216             // First set the receivers term to lower number
217             context.getTermInformation().update(95, "test");
218
219             // Set the last log entry term for the receiver to be greater than
220             // what we will be sending as the prevLogTerm in AppendEntries
221             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
222                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
223
224             // AppendEntries is now sent with a bigger term
225             // this will set the receivers term to be the same as the sender's term
226             AppendEntries appendEntries =
227                 new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
228
229             RaftActorBehavior behavior = createBehavior(context);
230
231             // Send an unknown message so that the state of the RaftActor remains unchanged
232             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
233
234             RaftActorBehavior raftBehavior =
235                 behavior.handleMessage(getRef(), appendEntries);
236
237             assertEquals(expected, raftBehavior);
238
239             // Also expect an AppendEntriesReply to be sent where success is false
240             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
241                 "AppendEntriesReply") {
242                 // do not put code outside this method, will run afterwards
243                 @Override
244                 protected Boolean match(Object in) {
245                     if (in instanceof AppendEntriesReply) {
246                         AppendEntriesReply reply = (AppendEntriesReply) in;
247                         return reply.isSuccess();
248                     } else {
249                         throw noMatch();
250                     }
251                 }
252             }.get();
253
254             assertEquals(false, out);
255
256
257         }};
258     }
259
260
261
262     /**
263      * This test verifies that when a new AppendEntries message is received with
264      * new entries and the logs of the sender and receiver match that the new
265      * entries get added to the log and the log is incremented by the number of
266      * entries received in appendEntries
267      *
268      * @throws Exception
269      */
270     @Test
271     public void testHandleAppendEntriesAddNewEntries() throws Exception {
272         new JavaTestKit(getSystem()) {{
273
274             MockRaftActorContext context = createActorContext();
275
276             // First set the receivers term to lower number
277             context.getTermInformation().update(1, "test");
278
279             // Prepare the receivers log
280             MockRaftActorContext.SimpleReplicatedLog log =
281                 new MockRaftActorContext.SimpleReplicatedLog();
282             log.append(
283                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
284             log.append(
285                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
286             log.append(
287                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
288
289             context.setReplicatedLog(log);
290
291             // Prepare the entries to be sent with AppendEntries
292             List<ReplicatedLogEntry> entries = new ArrayList<>();
293             entries.add(
294                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
295             entries.add(
296                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
297
298             // Send appendEntries with the same term as was set on the receiver
299             // before the new behavior was created (1 in this case)
300             // This will not work for a Candidate because as soon as a Candidate
301             // is created it increments the term
302             AppendEntries appendEntries =
303                 new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
304
305             RaftActorBehavior behavior = createBehavior(context);
306
307             // Send an unknown message so that the state of the RaftActor remains unchanged
308             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
309
310             RaftActorBehavior raftBehavior =
311                 behavior.handleMessage(getRef(), appendEntries);
312
313             assertEquals(expected, raftBehavior);
314             assertEquals(5, log.last().getIndex() + 1);
315             assertNotNull(log.get(3));
316             assertNotNull(log.get(4));
317
318             // Also expect an AppendEntriesReply to be sent where success is false
319             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
320                 "AppendEntriesReply") {
321                 // do not put code outside this method, will run afterwards
322                 @Override
323                 protected Boolean match(Object in) {
324                     if (in instanceof AppendEntriesReply) {
325                         AppendEntriesReply reply = (AppendEntriesReply) in;
326                         return reply.isSuccess();
327                     } else {
328                         throw noMatch();
329                     }
330                 }
331             }.get();
332
333             assertEquals(true, out);
334
335
336         }};
337     }
338
339
340
341     /**
342      * This test verifies that when a new AppendEntries message is received with
343      * new entries and the logs of the sender and receiver are out-of-sync that
344      * the log is first corrected by removing the out of sync entries from the
345      * log and then adding in the new entries sent with the AppendEntries message
346      *
347      * @throws Exception
348      */
349     @Test
350     public void testHandleAppendEntriesCorrectReceiverLogEntries()
351         throws Exception {
352         new JavaTestKit(getSystem()) {{
353
354             MockRaftActorContext context = createActorContext();
355
356             // First set the receivers term to lower number
357             context.getTermInformation().update(2, "test");
358
359             // Prepare the receivers log
360             MockRaftActorContext.SimpleReplicatedLog log =
361                 new MockRaftActorContext.SimpleReplicatedLog();
362             log.append(
363                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
364             log.append(
365                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
366             log.append(
367                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
368
369             context.setReplicatedLog(log);
370
371             // Prepare the entries to be sent with AppendEntries
372             List<ReplicatedLogEntry> entries = new ArrayList<>();
373             entries.add(
374                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
375             entries.add(
376                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
377
378             // Send appendEntries with the same term as was set on the receiver
379             // before the new behavior was created (1 in this case)
380             // This will not work for a Candidate because as soon as a Candidate
381             // is created it increments the term
382             AppendEntries appendEntries =
383                 new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
384
385             RaftActorBehavior behavior = createBehavior(context);
386
387             // Send an unknown message so that the state of the RaftActor remains unchanged
388             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
389
390             RaftActorBehavior raftBehavior =
391                 behavior.handleMessage(getRef(), appendEntries);
392
393             assertEquals(expected, raftBehavior);
394
395             // The entry at index 2 will be found out-of-sync with the leader
396             // and will be removed
397             // Then the two new entries will be added to the log
398             // Thus making the log to have 4 entries
399             assertEquals(4, log.last().getIndex() + 1);
400             assertNotNull(log.get(2));
401
402             assertEquals("one", log.get(1).getData().toString());
403
404             // Check that the entry at index 2 has the new data
405             assertEquals("two-1", log.get(2).getData().toString());
406
407             assertEquals("three", log.get(3).getData().toString());
408
409             assertNotNull(log.get(3));
410
411             // Also expect an AppendEntriesReply to be sent where success is false
412             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
413                 "AppendEntriesReply") {
414                 // do not put code outside this method, will run afterwards
415                 @Override
416                 protected Boolean match(Object in) {
417                     if (in instanceof AppendEntriesReply) {
418                         AppendEntriesReply reply = (AppendEntriesReply) in;
419                         return reply.isSuccess();
420                     } else {
421                         throw noMatch();
422                     }
423                 }
424             }.get();
425
426             assertEquals(true, out);
427
428
429         }};
430     }
431
432     @Test
433     public void testHandleAppendEntriesPreviousLogEntryMissing(){
434         new JavaTestKit(getSystem()) {{
435
436             MockRaftActorContext context = createActorContext();
437
438             // Prepare the receivers log
439             MockRaftActorContext.SimpleReplicatedLog log =
440                     new MockRaftActorContext.SimpleReplicatedLog();
441             log.append(
442                     new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
443             log.append(
444                     new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
445             log.append(
446                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
447
448             context.setReplicatedLog(log);
449
450             // Prepare the entries to be sent with AppendEntries
451             List<ReplicatedLogEntry> entries = new ArrayList<>();
452             entries.add(
453                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
454
455             AppendEntries appendEntries =
456                     new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
457
458             RaftActorBehavior behavior = createBehavior(context);
459
460             // Send an unknown message so that the state of the RaftActor remains unchanged
461             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
462
463             RaftActorBehavior raftBehavior =
464                     behavior.handleMessage(getRef(), appendEntries);
465
466             assertEquals(expected, raftBehavior);
467
468             // Also expect an AppendEntriesReply to be sent where success is false
469             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
470                     "AppendEntriesReply") {
471                 // do not put code outside this method, will run afterwards
472                 @Override
473                 protected Boolean match(Object in) {
474                     if (in instanceof AppendEntriesReply) {
475                         AppendEntriesReply reply = (AppendEntriesReply) in;
476                         return reply.isSuccess();
477                     } else {
478                         throw noMatch();
479                     }
480                 }
481             }.get();
482
483             assertEquals(false, out);
484
485         }};
486
487     }
488
489     @Test
490     public void testHandleAppendAfterInstallingSnapshot(){
491         new JavaTestKit(getSystem()) {{
492
493             MockRaftActorContext context = createActorContext();
494
495
496             // Prepare the receivers log
497             MockRaftActorContext.SimpleReplicatedLog log =
498                     new MockRaftActorContext.SimpleReplicatedLog();
499
500             // Set up a log as if it has been snapshotted
501             log.setSnapshotIndex(3);
502             log.setSnapshotTerm(1);
503
504             context.setReplicatedLog(log);
505
506             // Prepare the entries to be sent with AppendEntries
507             List<ReplicatedLogEntry> entries = new ArrayList<>();
508             entries.add(
509                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
510
511             AppendEntries appendEntries =
512                     new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
513
514             RaftActorBehavior behavior = createBehavior(context);
515
516             // Send an unknown message so that the state of the RaftActor remains unchanged
517             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
518
519             RaftActorBehavior raftBehavior =
520                     behavior.handleMessage(getRef(), appendEntries);
521
522             assertEquals(expected, raftBehavior);
523
524             // Also expect an AppendEntriesReply to be sent where success is false
525             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
526                     "AppendEntriesReply") {
527                 // do not put code outside this method, will run afterwards
528                 @Override
529                 protected Boolean match(Object in) {
530                     if (in instanceof AppendEntriesReply) {
531                         AppendEntriesReply reply = (AppendEntriesReply) in;
532                         return reply.isSuccess();
533                     } else {
534                         throw noMatch();
535                     }
536                 }
537             }.get();
538
539             assertEquals(true, out);
540
541         }};
542
543     }
544
545
546     /**
547      * This test verifies that when InstallSnapshot is received by
548      * the follower its applied correctly.
549      *
550      * @throws Exception
551      */
552     @Test
553     public void testHandleInstallSnapshot() throws Exception {
554         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
555
556             ActorRef leaderActor = getSystem().actorOf(Props.create(
557                 MessageCollectorActor.class));
558
559             MockRaftActorContext context = createActorContext(getRef());
560
561             Follower follower = (Follower)createBehavior(context);
562
563             HashMap<String, String> followerSnapshot = new HashMap<>();
564             followerSnapshot.put("1", "A");
565             followerSnapshot.put("2", "B");
566             followerSnapshot.put("3", "C");
567
568             ByteString bsSnapshot  = toByteString(followerSnapshot);
569             ByteString chunkData = ByteString.EMPTY;
570             int offset = 0;
571             int snapshotLength = bsSnapshot.size();
572             int i = 1;
573             int chunkIndex = 1;
574
575             do {
576                 chunkData = getNextChunk(bsSnapshot, offset);
577                 final InstallSnapshot installSnapshot =
578                     new InstallSnapshot(1, "leader-1", i, 1,
579                         chunkData, chunkIndex, 3);
580                 follower.handleMessage(leaderActor, installSnapshot);
581                 offset = offset + 50;
582                 i++;
583                 chunkIndex++;
584             } while ((offset+50) < snapshotLength);
585
586             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
587             follower.handleMessage(leaderActor, installSnapshot3);
588
589             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
590                 @Override
591                 protected String match(Object o) throws Exception {
592                     if (o instanceof ApplySnapshot) {
593                         ApplySnapshot as = (ApplySnapshot)o;
594                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
595                             return "applySnapshot-lastIndex-mismatch";
596                         }
597                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
598                             return "applySnapshot-lastAppliedTerm-mismatch";
599                         }
600                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
601                             return "applySnapshot-lastAppliedIndex-mismatch";
602                         }
603                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
604                             return "applySnapshot-lastTerm-mismatch";
605                         }
606                         return "applySnapshot";
607                     }
608
609                     return "ignoreCase";
610                 }
611             }.get();
612
613             // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
614             assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
615
616             String applySnapshotMatch = "";
617             for (String reply: matches) {
618                 if (reply.startsWith("applySnapshot")) {
619                     applySnapshotMatch = reply;
620                 }
621             }
622
623             assertEquals("applySnapshot", applySnapshotMatch);
624
625             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
626
627             assertNotNull(messages);
628             assertTrue(messages instanceof List);
629             List<Object> listMessages = (List<Object>) messages;
630
631             int installSnapshotReplyReceivedCount = 0;
632             for (Object message: listMessages) {
633                 if (message instanceof InstallSnapshotReply) {
634                     ++installSnapshotReplyReceivedCount;
635                 }
636             }
637
638             assertEquals(3, installSnapshotReplyReceivedCount);
639
640         }};
641     }
642
643     @Test
644     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
645         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
646             {
647
648                 ActorRef leaderActor = getSystem().actorOf(Props.create(
649                         MessageCollectorActor.class));
650
651                 MockRaftActorContext context = createActorContext(getRef());
652
653                 Follower follower = (Follower) createBehavior(context);
654
655                 HashMap<String, String> followerSnapshot = new HashMap<>();
656                 followerSnapshot.put("1", "A");
657                 followerSnapshot.put("2", "B");
658                 followerSnapshot.put("3", "C");
659
660                 ByteString bsSnapshot = toByteString(followerSnapshot);
661
662                 final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
663                 follower.handleMessage(leaderActor, installSnapshot);
664
665                 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
666
667                 assertNotNull(messages);
668                 assertTrue(messages instanceof List);
669                 List<Object> listMessages = (List<Object>) messages;
670
671                 int installSnapshotReplyReceivedCount = 0;
672                 for (Object message: listMessages) {
673                     if (message instanceof InstallSnapshotReply) {
674                         ++installSnapshotReplyReceivedCount;
675                     }
676                 }
677
678                 assertEquals(1, installSnapshotReplyReceivedCount);
679                 InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
680                 assertEquals(false, reply.isSuccess());
681                 assertEquals(-1, reply.getChunkIndex());
682                 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
683
684
685             }};
686     }
687
688     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
689         return MessageCollectorActor.getAllMessages(actor);
690     }
691
692     public ByteString getNextChunk (ByteString bs, int offset){
693         int snapshotLength = bs.size();
694         int start = offset;
695         int size = 50;
696         if (50 > snapshotLength) {
697             size = snapshotLength;
698         } else {
699             if ((start + 50) > snapshotLength) {
700                 size = snapshotLength - start;
701             }
702         }
703         return bs.substring(start, start + size);
704     }
705
706     private ByteString toByteString(Map<String, String> state) {
707         ByteArrayOutputStream b = null;
708         ObjectOutputStream o = null;
709         try {
710             try {
711                 b = new ByteArrayOutputStream();
712                 o = new ObjectOutputStream(b);
713                 o.writeObject(state);
714                 byte[] snapshotBytes = b.toByteArray();
715                 return ByteString.copyFrom(snapshotBytes);
716             } finally {
717                 if (o != null) {
718                     o.flush();
719                     o.close();
720                 }
721                 if (b != null) {
722                     b.close();
723                 }
724             }
725         } catch (IOException e) {
726             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
727         }
728         return null;
729     }
730 }