Fix non-generic references to NormalizedNode
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.common.base.Optional;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
16 import org.opendaylight.controller.cluster.raft.SerializationUtils;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
22 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
23 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
29 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
30 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
31 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
32 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
33 import scala.concurrent.duration.FiniteDuration;
34
35 import java.io.ByteArrayOutputStream;
36 import java.io.IOException;
37 import java.io.ObjectOutputStream;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.concurrent.TimeUnit;
42
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertNotNull;
45 import static org.junit.Assert.assertTrue;
46
47 public class LeaderTest extends AbstractRaftActorBehaviorTest {
48
49     private ActorRef leaderActor =
50         getSystem().actorOf(Props.create(DoNothingActor.class));
51     private ActorRef senderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53
54     @Test
55     public void testHandleMessageForUnknownMessage() throws Exception {
56         new JavaTestKit(getSystem()) {{
57             Leader leader =
58                 new Leader(createActorContext());
59
60             // handle message should return the Leader state when it receives an
61             // unknown message
62             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
63             Assert.assertTrue(behavior instanceof Leader);
64         }};
65     }
66
67
68     @Test
69     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
70         new JavaTestKit(getSystem()) {{
71
72             new Within(duration("1 seconds")) {
73                 protected void run() {
74
75                     ActorRef followerActor = getTestActor();
76
77                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
78
79                     Map<String, String> peerAddresses = new HashMap();
80
81                     peerAddresses.put(followerActor.path().toString(),
82                         followerActor.path().toString());
83
84                     actorContext.setPeerAddresses(peerAddresses);
85
86                     Leader leader = new Leader(actorContext);
87                     leader.handleMessage(senderActor, new SendHeartBeat());
88
89                     final String out =
90                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
91                             // do not put code outside this method, will run afterwards
92                             protected String match(Object in) {
93                                 Object msg = fromSerializableMessage(in);
94                                 if (msg instanceof AppendEntries) {
95                                     if (((AppendEntries)msg).getTerm() == 0) {
96                                         return "match";
97                                     }
98                                     return null;
99                                 } else {
100                                     throw noMatch();
101                                 }
102                             }
103                         }.get(); // this extracts the received message
104
105                     assertEquals("match", out);
106
107                 }
108             };
109         }};
110     }
111
112     @Test
113     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
114         new JavaTestKit(getSystem()) {{
115
116             new Within(duration("1 seconds")) {
117                 protected void run() {
118
119                     ActorRef followerActor = getTestActor();
120
121                     MockRaftActorContext actorContext =
122                         (MockRaftActorContext) createActorContext();
123
124                     Map<String, String> peerAddresses = new HashMap();
125
126                     peerAddresses.put(followerActor.path().toString(),
127                         followerActor.path().toString());
128
129                     actorContext.setPeerAddresses(peerAddresses);
130
131                     Leader leader = new Leader(actorContext);
132                     RaftActorBehavior raftBehavior = leader
133                         .handleMessage(senderActor, new Replicate(null, null,
134                             new MockRaftActorContext.MockReplicatedLogEntry(1,
135                                 100,
136                                 new MockRaftActorContext.MockPayload("foo"))
137                         ));
138
139                     // State should not change
140                     assertTrue(raftBehavior instanceof Leader);
141
142                     final String out =
143                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
144                             // do not put code outside this method, will run afterwards
145                             protected String match(Object in) {
146                                 Object msg = fromSerializableMessage(in);
147                                 if (msg instanceof AppendEntries) {
148                                     if (((AppendEntries)msg).getTerm() == 0) {
149                                         return "match";
150                                     }
151                                     return null;
152                                 } else {
153                                     throw noMatch();
154                                 }
155                             }
156                         }.get(); // this extracts the received message
157
158                     assertEquals("match", out);
159                 }
160             };
161         }};
162     }
163
164     @Test
165     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
166         new JavaTestKit(getSystem()) {{
167
168             new Within(duration("1 seconds")) {
169                 protected void run() {
170
171                     ActorRef raftActor = getTestActor();
172
173                     MockRaftActorContext actorContext =
174                         new MockRaftActorContext("test", getSystem(), raftActor);
175
176                     actorContext.getReplicatedLog().removeFrom(0);
177
178                     actorContext.setReplicatedLog(
179                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
180                             .build());
181
182                     Leader leader = new Leader(actorContext);
183                     RaftActorBehavior raftBehavior = leader
184                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
185
186                     // State should not change
187                     assertTrue(raftBehavior instanceof Leader);
188
189                     assertEquals(1, actorContext.getCommitIndex());
190
191                     final String out =
192                         new ExpectMsg<String>(duration("1 seconds"),
193                             "match hint") {
194                             // do not put code outside this method, will run afterwards
195                             protected String match(Object in) {
196                                 if (in instanceof ApplyState) {
197                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
198                                         return "match";
199                                     }
200                                     return null;
201                                 } else {
202                                     throw noMatch();
203                                 }
204                             }
205                         }.get(); // this extracts the received message
206
207                     assertEquals("match", out);
208
209                 }
210             };
211         }};
212     }
213
214     @Test
215     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
216         new JavaTestKit(getSystem()) {{
217             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
218
219             Map<String, String> peerAddresses = new HashMap();
220             peerAddresses.put(followerActor.path().toString(),
221                 followerActor.path().toString());
222
223             MockRaftActorContext actorContext =
224                 (MockRaftActorContext) createActorContext(leaderActor);
225             actorContext.setPeerAddresses(peerAddresses);
226
227             Map<String, String> leadersSnapshot = new HashMap<>();
228             leadersSnapshot.put("1", "A");
229             leadersSnapshot.put("2", "B");
230             leadersSnapshot.put("3", "C");
231
232             //clears leaders log
233             actorContext.getReplicatedLog().removeFrom(0);
234
235             final int followersLastIndex = 2;
236             final int snapshotIndex = 3;
237             final int newEntryIndex = 4;
238             final int snapshotTerm = 1;
239             final int currentTerm = 2;
240
241             // set the snapshot variables in replicatedlog
242             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
243             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
244             actorContext.setCommitIndex(followersLastIndex);
245             //set follower timeout to 2 mins, helps during debugging
246             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
247
248             MockLeader leader = new MockLeader(actorContext);
249
250             // new entry
251             ReplicatedLogImplEntry entry =
252                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
253                     new MockRaftActorContext.MockPayload("D"));
254
255             //update follower timestamp
256             leader.markFollowerActive(followerActor.path().toString());
257
258             ByteString bs = toByteString(leadersSnapshot);
259             leader.setSnapshot(Optional.of(bs));
260             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
261
262             //send first chunk and no InstallSnapshotReply received yet
263             leader.getFollowerToSnapshot().getNextChunk();
264             leader.getFollowerToSnapshot().incrementChunkIndex();
265
266             leader.handleMessage(leaderActor, new SendHeartBeat());
267
268             AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
269                 followerActor, AppendEntries.SERIALIZABLE_CLASS);
270
271             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
272                 "received", aeproto);
273
274             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
275
276             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
277
278             //InstallSnapshotReply received
279             leader.getFollowerToSnapshot().markSendStatus(true);
280
281             leader.handleMessage(senderActor, new SendHeartBeat());
282
283             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
284                 MessageCollectorActor.getFirstMatching(followerActor,
285                     InstallSnapshot.SERIALIZABLE_CLASS);
286
287             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
288                 isproto);
289
290             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
291
292             assertEquals(snapshotIndex, is.getLastIncludedIndex());
293
294         }};
295     }
296
297     @Test
298     public void testSendAppendEntriesSnapshotScenario() {
299         new JavaTestKit(getSystem()) {{
300
301             ActorRef followerActor = getTestActor();
302
303             Map<String, String> peerAddresses = new HashMap();
304             peerAddresses.put(followerActor.path().toString(),
305                 followerActor.path().toString());
306
307             MockRaftActorContext actorContext =
308                 (MockRaftActorContext) createActorContext(getRef());
309             actorContext.setPeerAddresses(peerAddresses);
310
311             Map<String, String> leadersSnapshot = new HashMap<>();
312             leadersSnapshot.put("1", "A");
313             leadersSnapshot.put("2", "B");
314             leadersSnapshot.put("3", "C");
315
316             //clears leaders log
317             actorContext.getReplicatedLog().removeFrom(0);
318
319             final int followersLastIndex = 2;
320             final int snapshotIndex = 3;
321             final int newEntryIndex = 4;
322             final int snapshotTerm = 1;
323             final int currentTerm = 2;
324
325             // set the snapshot variables in replicatedlog
326             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
327             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
328             actorContext.setCommitIndex(followersLastIndex);
329
330             Leader leader = new Leader(actorContext);
331
332             // new entry
333             ReplicatedLogImplEntry entry =
334                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
335                     new MockRaftActorContext.MockPayload("D"));
336
337             //update follower timestamp
338             leader.markFollowerActive(followerActor.path().toString());
339
340             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
341             RaftActorBehavior raftBehavior = leader.handleMessage(
342                 senderActor, new Replicate(null, "state-id", entry));
343
344             assertTrue(raftBehavior instanceof Leader);
345
346             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
347             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
348                 @Override
349                 protected Boolean match(Object o) throws Exception {
350                     if (o instanceof InitiateInstallSnapshot) {
351                         return true;
352                     }
353                     return false;
354                 }
355             }.get();
356
357             boolean initiateInitiateInstallSnapshot = false;
358             for (Boolean b: matches) {
359                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
360             }
361
362             assertTrue(initiateInitiateInstallSnapshot);
363         }};
364     }
365
366     @Test
367     public void testInitiateInstallSnapshot() throws Exception {
368         new JavaTestKit(getSystem()) {{
369
370             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
371
372             ActorRef followerActor = getTestActor();
373
374             Map<String, String> peerAddresses = new HashMap();
375             peerAddresses.put(followerActor.path().toString(),
376                 followerActor.path().toString());
377
378
379             MockRaftActorContext actorContext =
380                 (MockRaftActorContext) createActorContext(leaderActor);
381             actorContext.setPeerAddresses(peerAddresses);
382
383             Map<String, String> leadersSnapshot = new HashMap<>();
384             leadersSnapshot.put("1", "A");
385             leadersSnapshot.put("2", "B");
386             leadersSnapshot.put("3", "C");
387
388             //clears leaders log
389             actorContext.getReplicatedLog().removeFrom(0);
390
391             final int followersLastIndex = 2;
392             final int snapshotIndex = 3;
393             final int newEntryIndex = 4;
394             final int snapshotTerm = 1;
395             final int currentTerm = 2;
396
397             // set the snapshot variables in replicatedlog
398             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
399             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
400             actorContext.setLastApplied(3);
401             actorContext.setCommitIndex(followersLastIndex);
402
403             Leader leader = new Leader(actorContext);
404             // set the snapshot as absent and check if capture-snapshot is invoked.
405             leader.setSnapshot(Optional.<ByteString>absent());
406
407             // new entry
408             ReplicatedLogImplEntry entry =
409                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
410                     new MockRaftActorContext.MockPayload("D"));
411
412             actorContext.getReplicatedLog().append(entry);
413
414             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
415             RaftActorBehavior raftBehavior = leader.handleMessage(
416                 leaderActor, new InitiateInstallSnapshot());
417
418             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
419                 getFirstMatching(leaderActor, CaptureSnapshot.class);
420
421             assertNotNull(cs);
422
423             assertTrue(cs.isInstallSnapshotInitiated());
424             assertEquals(3, cs.getLastAppliedIndex());
425             assertEquals(1, cs.getLastAppliedTerm());
426             assertEquals(4, cs.getLastIndex());
427             assertEquals(2, cs.getLastTerm());
428         }};
429     }
430
431     @Test
432     public void testInstallSnapshot() {
433         new JavaTestKit(getSystem()) {{
434
435             ActorRef followerActor = getTestActor();
436
437             Map<String, String> peerAddresses = new HashMap();
438             peerAddresses.put(followerActor.path().toString(),
439                 followerActor.path().toString());
440
441             MockRaftActorContext actorContext =
442                 (MockRaftActorContext) createActorContext();
443             actorContext.setPeerAddresses(peerAddresses);
444
445
446             Map<String, String> leadersSnapshot = new HashMap<>();
447             leadersSnapshot.put("1", "A");
448             leadersSnapshot.put("2", "B");
449             leadersSnapshot.put("3", "C");
450
451             //clears leaders log
452             actorContext.getReplicatedLog().removeFrom(0);
453
454             final int followersLastIndex = 2;
455             final int snapshotIndex = 3;
456             final int newEntryIndex = 4;
457             final int snapshotTerm = 1;
458             final int currentTerm = 2;
459
460             // set the snapshot variables in replicatedlog
461             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
462             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
463             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
464             actorContext.setCommitIndex(followersLastIndex);
465
466             Leader leader = new Leader(actorContext);
467
468             // new entry
469             ReplicatedLogImplEntry entry =
470                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
471                     new MockRaftActorContext.MockPayload("D"));
472
473             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
474                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
475
476             assertTrue(raftBehavior instanceof Leader);
477
478             // check if installsnapshot gets called with the correct values.
479             final String out =
480                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
481                     // do not put code outside this method, will run afterwards
482                     protected String match(Object in) {
483                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
484                             InstallSnapshot is = (InstallSnapshot)
485                                 SerializationUtils.fromSerializable(in);
486                             if (is.getData() == null) {
487                                 return "InstallSnapshot data is null";
488                             }
489                             if (is.getLastIncludedIndex() != snapshotIndex) {
490                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
491                             }
492                             if (is.getLastIncludedTerm() != snapshotTerm) {
493                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
494                             }
495                             if (is.getTerm() == currentTerm) {
496                                 return is.getTerm() + "!=" + currentTerm;
497                             }
498
499                             return "match";
500
501                         } else {
502                             return "message mismatch:" + in.getClass();
503                         }
504                     }
505                 }.get(); // this extracts the received message
506
507             assertEquals("match", out);
508         }};
509     }
510
511     @Test
512     public void testHandleInstallSnapshotReplyLastChunk() {
513         new JavaTestKit(getSystem()) {{
514
515             ActorRef followerActor = getTestActor();
516
517             Map<String, String> peerAddresses = new HashMap();
518             peerAddresses.put(followerActor.path().toString(),
519                 followerActor.path().toString());
520
521             final int followersLastIndex = 2;
522             final int snapshotIndex = 3;
523             final int newEntryIndex = 4;
524             final int snapshotTerm = 1;
525             final int currentTerm = 2;
526
527             MockRaftActorContext actorContext =
528                 (MockRaftActorContext) createActorContext();
529             actorContext.setPeerAddresses(peerAddresses);
530             actorContext.setCommitIndex(followersLastIndex);
531
532             MockLeader leader = new MockLeader(actorContext);
533
534             Map<String, String> leadersSnapshot = new HashMap<>();
535             leadersSnapshot.put("1", "A");
536             leadersSnapshot.put("2", "B");
537             leadersSnapshot.put("3", "C");
538
539             // set the snapshot variables in replicatedlog
540
541             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
542             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
543             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
544
545             ByteString bs = toByteString(leadersSnapshot);
546             leader.setSnapshot(Optional.of(bs));
547             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
548             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
549                 leader.getFollowerToSnapshot().getNextChunk();
550                 leader.getFollowerToSnapshot().incrementChunkIndex();
551             }
552
553             //clears leaders log
554             actorContext.getReplicatedLog().removeFrom(0);
555
556             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
557                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
558                     leader.getFollowerToSnapshot().getChunkIndex(), true));
559
560             assertTrue(raftBehavior instanceof Leader);
561
562             assertEquals(leader.mapFollowerToSnapshot.size(), 0);
563             assertEquals(leader.followerToLog.size(), 1);
564             assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
565             FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
566             assertEquals(snapshotIndex, fli.getMatchIndex().get());
567             assertEquals(snapshotIndex, fli.getMatchIndex().get());
568             assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
569         }};
570     }
571
572     @Test
573     public void testFollowerToSnapshotLogic() {
574
575         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
576
577         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
578             @Override
579             public int getSnapshotChunkSize() {
580                 return 50;
581             }
582         });
583
584         MockLeader leader = new MockLeader(actorContext);
585
586         Map<String, String> leadersSnapshot = new HashMap<>();
587         leadersSnapshot.put("1", "A");
588         leadersSnapshot.put("2", "B");
589         leadersSnapshot.put("3", "C");
590
591         ByteString bs = toByteString(leadersSnapshot);
592         byte[] barray = bs.toByteArray();
593
594         leader.createFollowerToSnapshot("followerId", bs);
595         assertEquals(bs.size(), barray.length);
596
597         int chunkIndex=0;
598         for (int i=0; i < barray.length; i = i + 50) {
599             int j = i + 50;
600             chunkIndex++;
601
602             if (i + 50 > barray.length) {
603                 j = barray.length;
604             }
605
606             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
607             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
608             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
609
610             leader.getFollowerToSnapshot().markSendStatus(true);
611             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
612                 leader.getFollowerToSnapshot().incrementChunkIndex();
613             }
614         }
615
616         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
617     }
618
619
620     @Override protected RaftActorBehavior createBehavior(
621         RaftActorContext actorContext) {
622         return new Leader(actorContext);
623     }
624
625     @Override protected RaftActorContext createActorContext() {
626         return createActorContext(leaderActor);
627     }
628
629     protected RaftActorContext createActorContext(ActorRef actorRef) {
630         return new MockRaftActorContext("test", getSystem(), actorRef);
631     }
632
633     private ByteString toByteString(Map<String, String> state) {
634         ByteArrayOutputStream b = null;
635         ObjectOutputStream o = null;
636         try {
637             try {
638                 b = new ByteArrayOutputStream();
639                 o = new ObjectOutputStream(b);
640                 o.writeObject(state);
641                 byte[] snapshotBytes = b.toByteArray();
642                 return ByteString.copyFrom(snapshotBytes);
643             } finally {
644                 if (o != null) {
645                     o.flush();
646                     o.close();
647                 }
648                 if (b != null) {
649                     b.close();
650                 }
651             }
652         } catch (IOException e) {
653             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
654         }
655         return null;
656     }
657
658     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
659         private static AbstractRaftActorBehavior behavior;
660
661         public ForwardMessageToBehaviorActor(){
662
663         }
664
665         @Override public void onReceive(Object message) throws Exception {
666             super.onReceive(message);
667             behavior.handleMessage(sender(), message);
668         }
669
670         public static void setBehavior(AbstractRaftActorBehavior behavior){
671             ForwardMessageToBehaviorActor.behavior = behavior;
672         }
673     }
674
675     @Test
676     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
677         new JavaTestKit(getSystem()) {{
678
679             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
680
681             MockRaftActorContext leaderActorContext =
682                 new MockRaftActorContext("leader", getSystem(), leaderActor);
683
684             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
685
686             MockRaftActorContext followerActorContext =
687                 new MockRaftActorContext("follower", getSystem(), followerActor);
688
689             Follower follower = new Follower(followerActorContext);
690
691             ForwardMessageToBehaviorActor.setBehavior(follower);
692
693             Map<String, String> peerAddresses = new HashMap();
694             peerAddresses.put(followerActor.path().toString(),
695                 followerActor.path().toString());
696
697             leaderActorContext.setPeerAddresses(peerAddresses);
698
699             leaderActorContext.getReplicatedLog().removeFrom(0);
700
701             //create 3 entries
702             leaderActorContext.setReplicatedLog(
703                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
704
705             leaderActorContext.setCommitIndex(1);
706
707             followerActorContext.getReplicatedLog().removeFrom(0);
708
709             // follower too has the exact same log entries and has the same commit index
710             followerActorContext.setReplicatedLog(
711                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
712
713             followerActorContext.setCommitIndex(1);
714
715             Leader leader = new Leader(leaderActorContext);
716             leader.markFollowerActive(followerActor.path().toString());
717
718             leader.handleMessage(leaderActor, new SendHeartBeat());
719
720             AppendEntriesMessages.AppendEntries appendEntries =
721                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
722                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
723
724             assertNotNull(appendEntries);
725
726             assertEquals(1, appendEntries.getLeaderCommit());
727             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
728             assertEquals(0, appendEntries.getPrevLogIndex());
729
730             AppendEntriesReply appendEntriesReply =
731                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
732                     leaderActor, AppendEntriesReply.class);
733
734             assertNotNull(appendEntriesReply);
735
736             // follower returns its next index
737             assertEquals(2, appendEntriesReply.getLogLastIndex());
738             assertEquals(1, appendEntriesReply.getLogLastTerm());
739
740         }};
741     }
742
743
744     @Test
745     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
746         new JavaTestKit(getSystem()) {{
747
748             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
749
750             MockRaftActorContext leaderActorContext =
751                 new MockRaftActorContext("leader", getSystem(), leaderActor);
752
753             ActorRef followerActor = getSystem().actorOf(
754                 Props.create(ForwardMessageToBehaviorActor.class));
755
756             MockRaftActorContext followerActorContext =
757                 new MockRaftActorContext("follower", getSystem(), followerActor);
758
759             Follower follower = new Follower(followerActorContext);
760
761             ForwardMessageToBehaviorActor.setBehavior(follower);
762
763             Map<String, String> peerAddresses = new HashMap();
764             peerAddresses.put(followerActor.path().toString(),
765                 followerActor.path().toString());
766
767             leaderActorContext.setPeerAddresses(peerAddresses);
768
769             leaderActorContext.getReplicatedLog().removeFrom(0);
770
771             leaderActorContext.setReplicatedLog(
772                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
773
774             leaderActorContext.setCommitIndex(1);
775
776             followerActorContext.getReplicatedLog().removeFrom(0);
777
778             followerActorContext.setReplicatedLog(
779                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
780
781             // follower has the same log entries but its commit index > leaders commit index
782             followerActorContext.setCommitIndex(2);
783
784             Leader leader = new Leader(leaderActorContext);
785             leader.markFollowerActive(followerActor.path().toString());
786
787             leader.handleMessage(leaderActor, new SendHeartBeat());
788
789             AppendEntriesMessages.AppendEntries appendEntries =
790                 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
791                     .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
792
793             assertNotNull(appendEntries);
794
795             assertEquals(1, appendEntries.getLeaderCommit());
796             assertEquals(1, appendEntries.getLogEntries(0).getIndex());
797             assertEquals(0, appendEntries.getPrevLogIndex());
798
799             AppendEntriesReply appendEntriesReply =
800                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
801                     leaderActor, AppendEntriesReply.class);
802
803             assertNotNull(appendEntriesReply);
804
805             assertEquals(2, appendEntriesReply.getLogLastIndex());
806             assertEquals(1, appendEntriesReply.getLogLastTerm());
807
808         }};
809     }
810
811     @Test
812     public void testHandleAppendEntriesReplyFailure(){
813         new JavaTestKit(getSystem()) {
814             {
815
816                 ActorRef leaderActor =
817                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
818
819                 ActorRef followerActor =
820                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
821
822
823                 MockRaftActorContext leaderActorContext =
824                     new MockRaftActorContext("leader", getSystem(), leaderActor);
825
826                 Map<String, String> peerAddresses = new HashMap();
827                 peerAddresses.put("follower-1",
828                     followerActor.path().toString());
829
830                 leaderActorContext.setPeerAddresses(peerAddresses);
831
832                 Leader leader = new Leader(leaderActorContext);
833
834                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
835
836                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
837
838                 assertEquals(RaftState.Leader, raftActorBehavior.state());
839
840             }};
841     }
842
843     @Test
844     public void testHandleAppendEntriesReplySuccess() throws Exception {
845         new JavaTestKit(getSystem()) {
846             {
847
848                 ActorRef leaderActor =
849                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
850
851                 ActorRef followerActor =
852                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
853
854
855                 MockRaftActorContext leaderActorContext =
856                     new MockRaftActorContext("leader", getSystem(), leaderActor);
857
858                 leaderActorContext.setReplicatedLog(
859                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
860
861                 Map<String, String> peerAddresses = new HashMap();
862                 peerAddresses.put("follower-1",
863                     followerActor.path().toString());
864
865                 leaderActorContext.setPeerAddresses(peerAddresses);
866                 leaderActorContext.setCommitIndex(1);
867                 leaderActorContext.setLastApplied(1);
868                 leaderActorContext.getTermInformation().update(1, "leader");
869
870                 Leader leader = new Leader(leaderActorContext);
871
872                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
873
874                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
875
876                 assertEquals(RaftState.Leader, raftActorBehavior.state());
877
878                 assertEquals(2, leaderActorContext.getCommitIndex());
879
880                 ApplyLogEntries applyLogEntries =
881                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
882                         ApplyLogEntries.class);
883
884                 assertNotNull(applyLogEntries);
885
886                 assertEquals(2, leaderActorContext.getLastApplied());
887
888                 assertEquals(2, applyLogEntries.getToIndex());
889
890                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
891                     ApplyState.class);
892
893                 assertEquals(1,applyStateList.size());
894
895                 ApplyState applyState = (ApplyState) applyStateList.get(0);
896
897                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
898
899             }};
900     }
901
902     @Test
903     public void testHandleAppendEntriesReplyUnknownFollower(){
904         new JavaTestKit(getSystem()) {
905             {
906
907                 ActorRef leaderActor =
908                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
909
910                 MockRaftActorContext leaderActorContext =
911                     new MockRaftActorContext("leader", getSystem(), leaderActor);
912
913                 Leader leader = new Leader(leaderActorContext);
914
915                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
916
917                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
918
919                 assertEquals(RaftState.Leader, raftActorBehavior.state());
920
921             }};
922     }
923
924     @Test
925     public void testHandleRequestVoteReply(){
926         new JavaTestKit(getSystem()) {
927             {
928
929                 ActorRef leaderActor =
930                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
931
932                 MockRaftActorContext leaderActorContext =
933                     new MockRaftActorContext("leader", getSystem(), leaderActor);
934
935                 Leader leader = new Leader(leaderActorContext);
936
937                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
938
939                 assertEquals(RaftState.Leader, raftActorBehavior.state());
940
941                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
942
943                 assertEquals(RaftState.Leader, raftActorBehavior.state());
944
945
946             }};
947
948     }
949
950     class MockLeader extends Leader {
951
952         FollowerToSnapshot fts;
953
954         public MockLeader(RaftActorContext context){
955             super(context);
956         }
957
958         public FollowerToSnapshot getFollowerToSnapshot() {
959             return fts;
960         }
961
962         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
963             fts = new FollowerToSnapshot(bs);
964             mapFollowerToSnapshot.put(followerId, fts);
965
966         }
967     }
968
969     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
970
971         private long electionTimeOutIntervalMillis;
972         private int snapshotChunkSize;
973
974         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
975             super();
976             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
977             this.snapshotChunkSize = snapshotChunkSize;
978         }
979
980         @Override
981         public FiniteDuration getElectionTimeOutInterval() {
982             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
983         }
984
985         @Override
986         public int getSnapshotChunkSize() {
987             return snapshotChunkSize;
988         }
989     }
990 }