BUG-2599 Netconf notification manager initial API and Impl
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50
51     private final ActorRef leaderActor =
52         getSystem().actorOf(Props.create(DoNothingActor.class));
53     private final ActorRef senderActor =
54         getSystem().actorOf(Props.create(DoNothingActor.class));
55
56     @Test
57     public void testHandleMessageForUnknownMessage() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             Leader leader =
60                 new Leader(createActorContext());
61
62             // handle message should return the Leader state when it receives an
63             // unknown message
64             RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65             Assert.assertTrue(behavior instanceof Leader);
66         }};
67     }
68
69     @Test
70     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71         new JavaTestKit(getSystem()) {{
72
73             new Within(duration("1 seconds")) {
74                 @Override
75                 protected void run() {
76
77                     ActorRef followerActor = getTestActor();
78
79                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80
81                     Map<String, String> peerAddresses = new HashMap<>();
82
83                     peerAddresses.put(followerActor.path().toString(),
84                         followerActor.path().toString());
85
86                     actorContext.setPeerAddresses(peerAddresses);
87
88                     Leader leader = new Leader(actorContext);
89                     leader.handleMessage(senderActor, new SendHeartBeat());
90
91                     final String out =
92                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93                             // do not put code outside this method, will run afterwards
94                             @Override
95                             protected String match(Object in) {
96                                 Object msg = fromSerializableMessage(in);
97                                 if (msg instanceof AppendEntries) {
98                                     if (((AppendEntries)msg).getTerm() == 0) {
99                                         return "match";
100                                     }
101                                     return null;
102                                 } else {
103                                     throw noMatch();
104                                 }
105                             }
106                         }.get(); // this extracts the received message
107
108                     assertEquals("match", out);
109
110                 }
111             };
112         }};
113     }
114
115     @Test
116     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
117         new JavaTestKit(getSystem()) {{
118
119             new Within(duration("1 seconds")) {
120                 @Override
121                 protected void run() {
122
123                     ActorRef followerActor = getTestActor();
124
125                     MockRaftActorContext actorContext =
126                         (MockRaftActorContext) createActorContext();
127
128                     Map<String, String> peerAddresses = new HashMap<>();
129
130                     peerAddresses.put(followerActor.path().toString(),
131                             followerActor.path().toString());
132
133                     actorContext.setPeerAddresses(peerAddresses);
134
135                     Leader leader = new Leader(actorContext);
136                     RaftActorBehavior raftBehavior = leader
137                         .handleMessage(senderActor, new Replicate(null, null,
138                             new MockRaftActorContext.MockReplicatedLogEntry(1,
139                                 100,
140                                 new MockRaftActorContext.MockPayload("foo"))
141                         ));
142
143                     // State should not change
144                     assertTrue(raftBehavior instanceof Leader);
145
146                     final String out =
147                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
148                             // do not put code outside this method, will run afterwards
149                             @Override
150                             protected String match(Object in) {
151                                 Object msg = fromSerializableMessage(in);
152                                 if (msg instanceof AppendEntries) {
153                                     if (((AppendEntries)msg).getTerm() == 0) {
154                                         return "match";
155                                     }
156                                     return null;
157                                 } else {
158                                     throw noMatch();
159                                 }
160                             }
161                         }.get(); // this extracts the received message
162
163                     assertEquals("match", out);
164                 }
165             };
166         }};
167     }
168
169     @Test
170     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
171         new JavaTestKit(getSystem()) {{
172
173             new Within(duration("1 seconds")) {
174                 @Override
175                 protected void run() {
176
177                     ActorRef raftActor = getTestActor();
178
179                     MockRaftActorContext actorContext =
180                         new MockRaftActorContext("test", getSystem(), raftActor);
181
182                     actorContext.getReplicatedLog().removeFrom(0);
183
184                     actorContext.setReplicatedLog(
185                         new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
186                             .build());
187
188                     Leader leader = new Leader(actorContext);
189                     RaftActorBehavior raftBehavior = leader
190                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
191
192                     // State should not change
193                     assertTrue(raftBehavior instanceof Leader);
194
195                     assertEquals(1, actorContext.getCommitIndex());
196
197                     final String out =
198                         new ExpectMsg<String>(duration("1 seconds"),
199                             "match hint") {
200                             // do not put code outside this method, will run afterwards
201                             @Override
202                             protected String match(Object in) {
203                                 if (in instanceof ApplyState) {
204                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
205                                         return "match";
206                                     }
207                                     return null;
208                                 } else {
209                                     throw noMatch();
210                                 }
211                             }
212                         }.get(); // this extracts the received message
213
214                     assertEquals("match", out);
215
216                 }
217             };
218         }};
219     }
220
221     @Test
222     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
223         new JavaTestKit(getSystem()) {{
224             ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
225
226             Map<String, String> peerAddresses = new HashMap<>();
227             peerAddresses.put(followerActor.path().toString(),
228                 followerActor.path().toString());
229
230             MockRaftActorContext actorContext =
231                 (MockRaftActorContext) createActorContext(leaderActor);
232             actorContext.setPeerAddresses(peerAddresses);
233
234             Map<String, String> leadersSnapshot = new HashMap<>();
235             leadersSnapshot.put("1", "A");
236             leadersSnapshot.put("2", "B");
237             leadersSnapshot.put("3", "C");
238
239             //clears leaders log
240             actorContext.getReplicatedLog().removeFrom(0);
241
242             final int followersLastIndex = 2;
243             final int snapshotIndex = 3;
244             final int newEntryIndex = 4;
245             final int snapshotTerm = 1;
246             final int currentTerm = 2;
247
248             // set the snapshot variables in replicatedlog
249             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
250             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
251             actorContext.setCommitIndex(followersLastIndex);
252             //set follower timeout to 2 mins, helps during debugging
253             actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
254
255             MockLeader leader = new MockLeader(actorContext);
256
257             // new entry
258             ReplicatedLogImplEntry entry =
259                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
260                     new MockRaftActorContext.MockPayload("D"));
261
262             //update follower timestamp
263             leader.markFollowerActive(followerActor.path().toString());
264
265             ByteString bs = toByteString(leadersSnapshot);
266             leader.setSnapshot(Optional.of(bs));
267             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
268
269             //send first chunk and no InstallSnapshotReply received yet
270             leader.getFollowerToSnapshot().getNextChunk();
271             leader.getFollowerToSnapshot().incrementChunkIndex();
272
273             leader.handleMessage(leaderActor, new SendHeartBeat());
274
275             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
276                 followerActor, AppendEntries.class);
277
278             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
279                 "received", aeproto);
280
281             AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
282
283             assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
284
285             //InstallSnapshotReply received
286             leader.getFollowerToSnapshot().markSendStatus(true);
287
288             leader.handleMessage(senderActor, new SendHeartBeat());
289
290             InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
291                 MessageCollectorActor.getFirstMatching(followerActor,
292                     InstallSnapshot.SERIALIZABLE_CLASS);
293
294             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
295                 isproto);
296
297             InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
298
299             assertEquals(snapshotIndex, is.getLastIncludedIndex());
300
301         }};
302     }
303
304     @Test
305     public void testSendAppendEntriesSnapshotScenario() {
306         new JavaTestKit(getSystem()) {{
307
308             ActorRef followerActor = getTestActor();
309
310             Map<String, String> peerAddresses = new HashMap<>();
311             peerAddresses.put(followerActor.path().toString(),
312                 followerActor.path().toString());
313
314             MockRaftActorContext actorContext =
315                 (MockRaftActorContext) createActorContext(getRef());
316             actorContext.setPeerAddresses(peerAddresses);
317
318             Map<String, String> leadersSnapshot = new HashMap<>();
319             leadersSnapshot.put("1", "A");
320             leadersSnapshot.put("2", "B");
321             leadersSnapshot.put("3", "C");
322
323             //clears leaders log
324             actorContext.getReplicatedLog().removeFrom(0);
325
326             final int followersLastIndex = 2;
327             final int snapshotIndex = 3;
328             final int newEntryIndex = 4;
329             final int snapshotTerm = 1;
330             final int currentTerm = 2;
331
332             // set the snapshot variables in replicatedlog
333             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
334             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
335             actorContext.setCommitIndex(followersLastIndex);
336
337             Leader leader = new Leader(actorContext);
338
339             // new entry
340             ReplicatedLogImplEntry entry =
341                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
342                     new MockRaftActorContext.MockPayload("D"));
343
344             //update follower timestamp
345             leader.markFollowerActive(followerActor.path().toString());
346
347             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
348             RaftActorBehavior raftBehavior = leader.handleMessage(
349                 senderActor, new Replicate(null, "state-id", entry));
350
351             assertTrue(raftBehavior instanceof Leader);
352
353             // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
354             Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
355                 @Override
356                 protected Boolean match(Object o) throws Exception {
357                     if (o instanceof InitiateInstallSnapshot) {
358                         return true;
359                     }
360                     return false;
361                 }
362             }.get();
363
364             boolean initiateInitiateInstallSnapshot = false;
365             for (Boolean b: matches) {
366                 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
367             }
368
369             assertTrue(initiateInitiateInstallSnapshot);
370         }};
371     }
372
373     @Test
374     public void testInitiateInstallSnapshot() throws Exception {
375         new JavaTestKit(getSystem()) {{
376
377             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
378
379             ActorRef followerActor = getTestActor();
380
381             Map<String, String> peerAddresses = new HashMap<>();
382             peerAddresses.put(followerActor.path().toString(),
383                 followerActor.path().toString());
384
385
386             MockRaftActorContext actorContext =
387                 (MockRaftActorContext) createActorContext(leaderActor);
388             actorContext.setPeerAddresses(peerAddresses);
389
390             Map<String, String> leadersSnapshot = new HashMap<>();
391             leadersSnapshot.put("1", "A");
392             leadersSnapshot.put("2", "B");
393             leadersSnapshot.put("3", "C");
394
395             //clears leaders log
396             actorContext.getReplicatedLog().removeFrom(0);
397
398             final int followersLastIndex = 2;
399             final int snapshotIndex = 3;
400             final int newEntryIndex = 4;
401             final int snapshotTerm = 1;
402             final int currentTerm = 2;
403
404             // set the snapshot variables in replicatedlog
405             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
406             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
407             actorContext.setLastApplied(3);
408             actorContext.setCommitIndex(followersLastIndex);
409
410             Leader leader = new Leader(actorContext);
411             // set the snapshot as absent and check if capture-snapshot is invoked.
412             leader.setSnapshot(Optional.<ByteString>absent());
413
414             // new entry
415             ReplicatedLogImplEntry entry =
416                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
417                     new MockRaftActorContext.MockPayload("D"));
418
419             actorContext.getReplicatedLog().append(entry);
420
421             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
422             RaftActorBehavior raftBehavior = leader.handleMessage(
423                 leaderActor, new InitiateInstallSnapshot());
424
425             CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
426                 getFirstMatching(leaderActor, CaptureSnapshot.class);
427
428             assertNotNull(cs);
429
430             assertTrue(cs.isInstallSnapshotInitiated());
431             assertEquals(3, cs.getLastAppliedIndex());
432             assertEquals(1, cs.getLastAppliedTerm());
433             assertEquals(4, cs.getLastIndex());
434             assertEquals(2, cs.getLastTerm());
435         }};
436     }
437
438     @Test
439     public void testInstallSnapshot() {
440         new JavaTestKit(getSystem()) {{
441
442             ActorRef followerActor = getTestActor();
443
444             Map<String, String> peerAddresses = new HashMap<>();
445             peerAddresses.put(followerActor.path().toString(),
446                 followerActor.path().toString());
447
448             MockRaftActorContext actorContext =
449                 (MockRaftActorContext) createActorContext();
450             actorContext.setPeerAddresses(peerAddresses);
451
452
453             Map<String, String> leadersSnapshot = new HashMap<>();
454             leadersSnapshot.put("1", "A");
455             leadersSnapshot.put("2", "B");
456             leadersSnapshot.put("3", "C");
457
458             //clears leaders log
459             actorContext.getReplicatedLog().removeFrom(0);
460
461             final int followersLastIndex = 2;
462             final int snapshotIndex = 3;
463             final int newEntryIndex = 4;
464             final int snapshotTerm = 1;
465             final int currentTerm = 2;
466
467             // set the snapshot variables in replicatedlog
468             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
469             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
470             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
471             actorContext.setCommitIndex(followersLastIndex);
472
473             Leader leader = new Leader(actorContext);
474
475             // new entry
476             ReplicatedLogImplEntry entry =
477                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
478                     new MockRaftActorContext.MockPayload("D"));
479
480             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
481                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
482
483             assertTrue(raftBehavior instanceof Leader);
484
485             // check if installsnapshot gets called with the correct values.
486             final String out =
487                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
488                     // do not put code outside this method, will run afterwards
489                     @Override
490                     protected String match(Object in) {
491                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
492                             InstallSnapshot is = (InstallSnapshot)
493                                 SerializationUtils.fromSerializable(in);
494                             if (is.getData() == null) {
495                                 return "InstallSnapshot data is null";
496                             }
497                             if (is.getLastIncludedIndex() != snapshotIndex) {
498                                 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
499                             }
500                             if (is.getLastIncludedTerm() != snapshotTerm) {
501                                 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
502                             }
503                             if (is.getTerm() == currentTerm) {
504                                 return is.getTerm() + "!=" + currentTerm;
505                             }
506
507                             return "match";
508
509                         } else {
510                             return "message mismatch:" + in.getClass();
511                         }
512                     }
513                 }.get(); // this extracts the received message
514
515             assertEquals("match", out);
516         }};
517     }
518
519     @Test
520     public void testHandleInstallSnapshotReplyLastChunk() {
521         new JavaTestKit(getSystem()) {{
522
523             ActorRef followerActor = getTestActor();
524
525             Map<String, String> peerAddresses = new HashMap<>();
526             peerAddresses.put(followerActor.path().toString(),
527                 followerActor.path().toString());
528
529             final int followersLastIndex = 2;
530             final int snapshotIndex = 3;
531             final int newEntryIndex = 4;
532             final int snapshotTerm = 1;
533             final int currentTerm = 2;
534
535             MockRaftActorContext actorContext =
536                 (MockRaftActorContext) createActorContext();
537             actorContext.setPeerAddresses(peerAddresses);
538             actorContext.setCommitIndex(followersLastIndex);
539
540             MockLeader leader = new MockLeader(actorContext);
541
542             Map<String, String> leadersSnapshot = new HashMap<>();
543             leadersSnapshot.put("1", "A");
544             leadersSnapshot.put("2", "B");
545             leadersSnapshot.put("3", "C");
546
547             // set the snapshot variables in replicatedlog
548
549             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
550             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
551             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
552
553             ByteString bs = toByteString(leadersSnapshot);
554             leader.setSnapshot(Optional.of(bs));
555             leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
556             while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
557                 leader.getFollowerToSnapshot().getNextChunk();
558                 leader.getFollowerToSnapshot().incrementChunkIndex();
559             }
560
561             //clears leaders log
562             actorContext.getReplicatedLog().removeFrom(0);
563
564             RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
565                 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
566                     leader.getFollowerToSnapshot().getChunkIndex(), true));
567
568             assertTrue(raftBehavior instanceof Leader);
569
570             assertEquals(0, leader.followerSnapshotSize());
571             assertEquals(1, leader.followerLogSize());
572             assertNotNull(leader.getFollower(followerActor.path().toString()));
573             FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
574             assertEquals(snapshotIndex, fli.getMatchIndex());
575             assertEquals(snapshotIndex, fli.getMatchIndex());
576             assertEquals(snapshotIndex + 1, fli.getNextIndex());
577         }};
578     }
579
580     @Test
581     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
582         new JavaTestKit(getSystem()) {{
583
584             TestActorRef<MessageCollectorActor> followerActor =
585                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
586
587             Map<String, String> peerAddresses = new HashMap<>();
588             peerAddresses.put(followerActor.path().toString(),
589                     followerActor.path().toString());
590
591             final int followersLastIndex = 2;
592             final int snapshotIndex = 3;
593             final int snapshotTerm = 1;
594             final int currentTerm = 2;
595
596             MockRaftActorContext actorContext =
597                     (MockRaftActorContext) createActorContext();
598
599             actorContext.setConfigParams(new DefaultConfigParamsImpl(){
600                 @Override
601                 public int getSnapshotChunkSize() {
602                     return 50;
603                 }
604             });
605             actorContext.setPeerAddresses(peerAddresses);
606             actorContext.setCommitIndex(followersLastIndex);
607
608             MockLeader leader = new MockLeader(actorContext);
609
610             Map<String, String> leadersSnapshot = new HashMap<>();
611             leadersSnapshot.put("1", "A");
612             leadersSnapshot.put("2", "B");
613             leadersSnapshot.put("3", "C");
614
615             // set the snapshot variables in replicatedlog
616             actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
617             actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
618             actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
619
620             ByteString bs = toByteString(leadersSnapshot);
621             leader.setSnapshot(Optional.of(bs));
622
623             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
624
625             Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
626
627             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
628
629             InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
630
631             assertEquals(1, installSnapshot.getChunkIndex());
632             assertEquals(3, installSnapshot.getTotalChunks());
633
634
635             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
636
637             leader.handleMessage(leaderActor, new SendHeartBeat());
638
639             o = MessageCollectorActor.getAllMessages(followerActor).get(1);
640
641             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
642
643             installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
644
645             assertEquals(1, installSnapshot.getChunkIndex());
646             assertEquals(3, installSnapshot.getTotalChunks());
647
648             followerActor.tell(PoisonPill.getInstance(), getRef());
649         }};
650     }
651
652     @Test
653     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
654         new JavaTestKit(getSystem()) {
655             {
656
657                 TestActorRef<MessageCollectorActor> followerActor =
658                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
659
660                 Map<String, String> peerAddresses = new HashMap<>();
661                 peerAddresses.put(followerActor.path().toString(),
662                         followerActor.path().toString());
663
664                 final int followersLastIndex = 2;
665                 final int snapshotIndex = 3;
666                 final int snapshotTerm = 1;
667                 final int currentTerm = 2;
668
669                 MockRaftActorContext actorContext =
670                         (MockRaftActorContext) createActorContext();
671
672                 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
673                     @Override
674                     public int getSnapshotChunkSize() {
675                         return 50;
676                     }
677                 });
678                 actorContext.setPeerAddresses(peerAddresses);
679                 actorContext.setCommitIndex(followersLastIndex);
680
681                 MockLeader leader = new MockLeader(actorContext);
682
683                 Map<String, String> leadersSnapshot = new HashMap<>();
684                 leadersSnapshot.put("1", "A");
685                 leadersSnapshot.put("2", "B");
686                 leadersSnapshot.put("3", "C");
687
688                 // set the snapshot variables in replicatedlog
689                 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
690                 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
691                 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
692
693                 ByteString bs = toByteString(leadersSnapshot);
694                 leader.setSnapshot(Optional.of(bs));
695
696                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
697
698                 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
699
700                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
701
702                 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
703
704                 assertEquals(1, installSnapshot.getChunkIndex());
705                 assertEquals(3, installSnapshot.getTotalChunks());
706                 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
707
708                 int hashCode = installSnapshot.getData().hashCode();
709
710                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
711
712                 leader.handleMessage(leaderActor, new SendHeartBeat());
713
714                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
715
716                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
717
718                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
719
720                 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
721
722                 assertEquals(2, installSnapshot.getChunkIndex());
723                 assertEquals(3, installSnapshot.getTotalChunks());
724                 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
725
726                 followerActor.tell(PoisonPill.getInstance(), getRef());
727             }};
728     }
729
730     @Test
731     public void testFollowerToSnapshotLogic() {
732
733         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
734
735         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
736             @Override
737             public int getSnapshotChunkSize() {
738                 return 50;
739             }
740         });
741
742         MockLeader leader = new MockLeader(actorContext);
743
744         Map<String, String> leadersSnapshot = new HashMap<>();
745         leadersSnapshot.put("1", "A");
746         leadersSnapshot.put("2", "B");
747         leadersSnapshot.put("3", "C");
748
749         ByteString bs = toByteString(leadersSnapshot);
750         byte[] barray = bs.toByteArray();
751
752         leader.createFollowerToSnapshot("followerId", bs);
753         assertEquals(bs.size(), barray.length);
754
755         int chunkIndex=0;
756         for (int i=0; i < barray.length; i = i + 50) {
757             int j = i + 50;
758             chunkIndex++;
759
760             if (i + 50 > barray.length) {
761                 j = barray.length;
762             }
763
764             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
765             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
766             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
767
768             leader.getFollowerToSnapshot().markSendStatus(true);
769             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
770                 leader.getFollowerToSnapshot().incrementChunkIndex();
771             }
772         }
773
774         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
775     }
776
777
778     @Override protected RaftActorBehavior createBehavior(
779         RaftActorContext actorContext) {
780         return new Leader(actorContext);
781     }
782
783     @Override protected RaftActorContext createActorContext() {
784         return createActorContext(leaderActor);
785     }
786
787     @Override
788     protected RaftActorContext createActorContext(ActorRef actorRef) {
789         return new MockRaftActorContext("test", getSystem(), actorRef);
790     }
791
792     private ByteString toByteString(Map<String, String> state) {
793         ByteArrayOutputStream b = null;
794         ObjectOutputStream o = null;
795         try {
796             try {
797                 b = new ByteArrayOutputStream();
798                 o = new ObjectOutputStream(b);
799                 o.writeObject(state);
800                 byte[] snapshotBytes = b.toByteArray();
801                 return ByteString.copyFrom(snapshotBytes);
802             } finally {
803                 if (o != null) {
804                     o.flush();
805                     o.close();
806                 }
807                 if (b != null) {
808                     b.close();
809                 }
810             }
811         } catch (IOException e) {
812             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
813         }
814         return null;
815     }
816
817     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
818         private static AbstractRaftActorBehavior behavior;
819
820         public ForwardMessageToBehaviorActor(){
821
822         }
823
824         @Override public void onReceive(Object message) throws Exception {
825             super.onReceive(message);
826             behavior.handleMessage(sender(), message);
827         }
828
829         public static void setBehavior(AbstractRaftActorBehavior behavior){
830             ForwardMessageToBehaviorActor.behavior = behavior;
831         }
832     }
833
834     @Test
835     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
836         new JavaTestKit(getSystem()) {{
837
838             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
839
840             MockRaftActorContext leaderActorContext =
841                 new MockRaftActorContext("leader", getSystem(), leaderActor);
842
843             ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
844
845             MockRaftActorContext followerActorContext =
846                 new MockRaftActorContext("follower", getSystem(), followerActor);
847
848             Follower follower = new Follower(followerActorContext);
849
850             ForwardMessageToBehaviorActor.setBehavior(follower);
851
852             Map<String, String> peerAddresses = new HashMap<>();
853             peerAddresses.put(followerActor.path().toString(),
854                 followerActor.path().toString());
855
856             leaderActorContext.setPeerAddresses(peerAddresses);
857
858             leaderActorContext.getReplicatedLog().removeFrom(0);
859
860             //create 3 entries
861             leaderActorContext.setReplicatedLog(
862                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
863
864             leaderActorContext.setCommitIndex(1);
865
866             followerActorContext.getReplicatedLog().removeFrom(0);
867
868             // follower too has the exact same log entries and has the same commit index
869             followerActorContext.setReplicatedLog(
870                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
871
872             followerActorContext.setCommitIndex(1);
873
874             Leader leader = new Leader(leaderActorContext);
875             leader.markFollowerActive(followerActor.path().toString());
876
877             leader.handleMessage(leaderActor, new SendHeartBeat());
878
879             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
880                     .getFirstMatching(followerActor, AppendEntries.class);
881
882             assertNotNull(appendEntries);
883
884             assertEquals(1, appendEntries.getLeaderCommit());
885             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
886             assertEquals(0, appendEntries.getPrevLogIndex());
887
888             AppendEntriesReply appendEntriesReply =
889                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
890                     leaderActor, AppendEntriesReply.class);
891
892             assertNotNull(appendEntriesReply);
893
894             // follower returns its next index
895             assertEquals(2, appendEntriesReply.getLogLastIndex());
896             assertEquals(1, appendEntriesReply.getLogLastTerm());
897
898         }};
899     }
900
901
902     @Test
903     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
904         new JavaTestKit(getSystem()) {{
905
906             ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
907
908             MockRaftActorContext leaderActorContext =
909                 new MockRaftActorContext("leader", getSystem(), leaderActor);
910
911             ActorRef followerActor = getSystem().actorOf(
912                 Props.create(ForwardMessageToBehaviorActor.class));
913
914             MockRaftActorContext followerActorContext =
915                 new MockRaftActorContext("follower", getSystem(), followerActor);
916
917             Follower follower = new Follower(followerActorContext);
918
919             ForwardMessageToBehaviorActor.setBehavior(follower);
920
921             Map<String, String> peerAddresses = new HashMap<>();
922             peerAddresses.put(followerActor.path().toString(),
923                 followerActor.path().toString());
924
925             leaderActorContext.setPeerAddresses(peerAddresses);
926
927             leaderActorContext.getReplicatedLog().removeFrom(0);
928
929             leaderActorContext.setReplicatedLog(
930                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
931
932             leaderActorContext.setCommitIndex(1);
933
934             followerActorContext.getReplicatedLog().removeFrom(0);
935
936             followerActorContext.setReplicatedLog(
937                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
938
939             // follower has the same log entries but its commit index > leaders commit index
940             followerActorContext.setCommitIndex(2);
941
942             Leader leader = new Leader(leaderActorContext);
943             leader.markFollowerActive(followerActor.path().toString());
944
945             leader.handleMessage(leaderActor, new SendHeartBeat());
946
947             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
948                     .getFirstMatching(followerActor, AppendEntries.class);
949
950             assertNotNull(appendEntries);
951
952             assertEquals(1, appendEntries.getLeaderCommit());
953             assertEquals(1, appendEntries.getEntries().get(0).getIndex());
954             assertEquals(0, appendEntries.getPrevLogIndex());
955
956             AppendEntriesReply appendEntriesReply =
957                 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
958                     leaderActor, AppendEntriesReply.class);
959
960             assertNotNull(appendEntriesReply);
961
962             assertEquals(2, appendEntriesReply.getLogLastIndex());
963             assertEquals(1, appendEntriesReply.getLogLastTerm());
964
965         }};
966     }
967
968     @Test
969     public void testHandleAppendEntriesReplyFailure(){
970         new JavaTestKit(getSystem()) {
971             {
972
973                 ActorRef leaderActor =
974                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
975
976                 ActorRef followerActor =
977                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
978
979
980                 MockRaftActorContext leaderActorContext =
981                     new MockRaftActorContext("leader", getSystem(), leaderActor);
982
983                 Map<String, String> peerAddresses = new HashMap<>();
984                 peerAddresses.put("follower-1",
985                     followerActor.path().toString());
986
987                 leaderActorContext.setPeerAddresses(peerAddresses);
988
989                 Leader leader = new Leader(leaderActorContext);
990
991                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
992
993                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
994
995                 assertEquals(RaftState.Leader, raftActorBehavior.state());
996
997             }};
998     }
999
1000     @Test
1001     public void testHandleAppendEntriesReplySuccess() throws Exception {
1002         new JavaTestKit(getSystem()) {
1003             {
1004
1005                 ActorRef leaderActor =
1006                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1007
1008                 ActorRef followerActor =
1009                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1010
1011
1012                 MockRaftActorContext leaderActorContext =
1013                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1014
1015                 leaderActorContext.setReplicatedLog(
1016                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1017
1018                 Map<String, String> peerAddresses = new HashMap<>();
1019                 peerAddresses.put("follower-1",
1020                     followerActor.path().toString());
1021
1022                 leaderActorContext.setPeerAddresses(peerAddresses);
1023                 leaderActorContext.setCommitIndex(1);
1024                 leaderActorContext.setLastApplied(1);
1025                 leaderActorContext.getTermInformation().update(1, "leader");
1026
1027                 Leader leader = new Leader(leaderActorContext);
1028
1029                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1030
1031                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1032
1033                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1034
1035                 assertEquals(2, leaderActorContext.getCommitIndex());
1036
1037                 ApplyLogEntries applyLogEntries =
1038                     (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1039                         ApplyLogEntries.class);
1040
1041                 assertNotNull(applyLogEntries);
1042
1043                 assertEquals(2, leaderActorContext.getLastApplied());
1044
1045                 assertEquals(2, applyLogEntries.getToIndex());
1046
1047                 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1048                     ApplyState.class);
1049
1050                 assertEquals(1,applyStateList.size());
1051
1052                 ApplyState applyState = (ApplyState) applyStateList.get(0);
1053
1054                 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1055
1056             }};
1057     }
1058
1059     @Test
1060     public void testHandleAppendEntriesReplyUnknownFollower(){
1061         new JavaTestKit(getSystem()) {
1062             {
1063
1064                 ActorRef leaderActor =
1065                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1066
1067                 MockRaftActorContext leaderActorContext =
1068                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1069
1070                 Leader leader = new Leader(leaderActorContext);
1071
1072                 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1073
1074                 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1075
1076                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1077
1078             }};
1079     }
1080
1081     @Test
1082     public void testHandleRequestVoteReply(){
1083         new JavaTestKit(getSystem()) {
1084             {
1085
1086                 ActorRef leaderActor =
1087                     getSystem().actorOf(Props.create(MessageCollectorActor.class));
1088
1089                 MockRaftActorContext leaderActorContext =
1090                     new MockRaftActorContext("leader", getSystem(), leaderActor);
1091
1092                 Leader leader = new Leader(leaderActorContext);
1093
1094                 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1095
1096                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1097
1098                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1099
1100                 assertEquals(RaftState.Leader, raftActorBehavior.state());
1101             }};
1102     }
1103
1104     @Test
1105     public void testIsolatedLeaderCheckNoFollowers() {
1106         new JavaTestKit(getSystem()) {{
1107             ActorRef leaderActor = getTestActor();
1108
1109             MockRaftActorContext leaderActorContext =
1110                 new MockRaftActorContext("leader", getSystem(), leaderActor);
1111
1112             Map<String, String> peerAddresses = new HashMap<>();
1113             leaderActorContext.setPeerAddresses(peerAddresses);
1114
1115             Leader leader = new Leader(leaderActorContext);
1116             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1117             Assert.assertTrue(behavior instanceof Leader);
1118         }};
1119     }
1120
1121     @Test
1122     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1123         new JavaTestKit(getSystem()) {{
1124
1125             ActorRef followerActor1 = getTestActor();
1126             ActorRef followerActor2 = getTestActor();
1127
1128             MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1129
1130             Map<String, String> peerAddresses = new HashMap<>();
1131             peerAddresses.put("follower-1", followerActor1.path().toString());
1132             peerAddresses.put("follower-2", followerActor2.path().toString());
1133
1134             leaderActorContext.setPeerAddresses(peerAddresses);
1135
1136             Leader leader = new Leader(leaderActorContext);
1137             leader.stopIsolatedLeaderCheckSchedule();
1138
1139             leader.markFollowerActive("follower-1");
1140             leader.markFollowerActive("follower-2");
1141             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1142             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1143                 behavior instanceof Leader);
1144
1145             // kill 1 follower and verify if that got killed
1146             final JavaTestKit probe = new JavaTestKit(getSystem());
1147             probe.watch(followerActor1);
1148             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1149             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1150             assertEquals(termMsg1.getActor(), followerActor1);
1151
1152             leader.markFollowerInActive("follower-1");
1153             leader.markFollowerActive("follower-2");
1154             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1155             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1156                 behavior instanceof Leader);
1157
1158             // kill 2nd follower and leader should change to Isolated leader
1159             followerActor2.tell(PoisonPill.getInstance(), null);
1160             probe.watch(followerActor2);
1161             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1162             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1163             assertEquals(termMsg2.getActor(), followerActor2);
1164
1165             leader.markFollowerInActive("follower-2");
1166             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1167             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1168                 behavior instanceof IsolatedLeader);
1169
1170         }};
1171     }
1172
1173     class MockLeader extends Leader {
1174
1175         FollowerToSnapshot fts;
1176
1177         public MockLeader(RaftActorContext context){
1178             super(context);
1179         }
1180
1181         public FollowerToSnapshot getFollowerToSnapshot() {
1182             return fts;
1183         }
1184
1185         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1186             fts = new FollowerToSnapshot(bs);
1187             setFollowerSnapshot(followerId, fts);
1188         }
1189     }
1190
1191     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1192
1193         private final long electionTimeOutIntervalMillis;
1194         private final int snapshotChunkSize;
1195
1196         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1197             super();
1198             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1199             this.snapshotChunkSize = snapshotChunkSize;
1200         }
1201
1202         @Override
1203         public FiniteDuration getElectionTimeOutInterval() {
1204             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1205         }
1206
1207         @Override
1208         public int getSnapshotChunkSize() {
1209             return snapshotChunkSize;
1210         }
1211     }
1212 }