1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51 private final ActorRef leaderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
53 private final ActorRef senderActor =
54 getSystem().actorOf(Props.create(DoNothingActor.class));
57 public void testHandleMessageForUnknownMessage() throws Exception {
58 new JavaTestKit(getSystem()) {{
60 new Leader(createActorContext());
62 // handle message should return the Leader state when it receives an
64 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65 Assert.assertTrue(behavior instanceof Leader);
70 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71 new JavaTestKit(getSystem()) {{
73 new Within(duration("1 seconds")) {
75 protected void run() {
77 ActorRef followerActor = getTestActor();
79 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81 Map<String, String> peerAddresses = new HashMap<>();
83 peerAddresses.put(followerActor.path().toString(),
84 followerActor.path().toString());
86 actorContext.setPeerAddresses(peerAddresses);
88 Leader leader = new Leader(actorContext);
89 leader.markFollowerActive(followerActor.path().toString());
90 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
91 TimeUnit.MILLISECONDS);
92 leader.handleMessage(senderActor, new SendHeartBeat());
95 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
96 // do not put code outside this method, will run afterwards
98 protected String match(Object in) {
99 Object msg = fromSerializableMessage(in);
100 if (msg instanceof AppendEntries) {
101 if (((AppendEntries)msg).getTerm() == 0) {
109 }.get(); // this extracts the received message
111 assertEquals("match", out);
119 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
120 new JavaTestKit(getSystem()) {{
122 new Within(duration("1 seconds")) {
124 protected void run() {
126 ActorRef followerActor = getTestActor();
128 MockRaftActorContext actorContext =
129 (MockRaftActorContext) createActorContext();
131 Map<String, String> peerAddresses = new HashMap<>();
133 peerAddresses.put(followerActor.path().toString(),
134 followerActor.path().toString());
136 actorContext.setPeerAddresses(peerAddresses);
138 Leader leader = new Leader(actorContext);
139 leader.markFollowerActive(followerActor.path().toString());
140 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
141 TimeUnit.MILLISECONDS);
142 RaftActorBehavior raftBehavior = leader
143 .handleMessage(senderActor, new Replicate(null, null,
144 new MockRaftActorContext.MockReplicatedLogEntry(1,
146 new MockRaftActorContext.MockPayload("foo"))
149 // State should not change
150 assertTrue(raftBehavior instanceof Leader);
153 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
154 // do not put code outside this method, will run afterwards
156 protected String match(Object in) {
157 Object msg = fromSerializableMessage(in);
158 if (msg instanceof AppendEntries) {
159 if (((AppendEntries)msg).getTerm() == 0) {
167 }.get(); // this extracts the received message
169 assertEquals("match", out);
176 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
177 new JavaTestKit(getSystem()) {{
179 new Within(duration("1 seconds")) {
181 protected void run() {
183 ActorRef raftActor = getTestActor();
185 MockRaftActorContext actorContext =
186 new MockRaftActorContext("test", getSystem(), raftActor);
188 actorContext.getReplicatedLog().removeFrom(0);
190 actorContext.setReplicatedLog(
191 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
194 Leader leader = new Leader(actorContext);
195 RaftActorBehavior raftBehavior = leader
196 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
198 // State should not change
199 assertTrue(raftBehavior instanceof Leader);
201 assertEquals(1, actorContext.getCommitIndex());
204 new ExpectMsg<String>(duration("1 seconds"),
206 // do not put code outside this method, will run afterwards
208 protected String match(Object in) {
209 if (in instanceof ApplyState) {
210 if (((ApplyState) in).getIdentifier().equals("state-id")) {
218 }.get(); // this extracts the received message
220 assertEquals("match", out);
228 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
229 new JavaTestKit(getSystem()) {{
230 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
232 Map<String, String> peerAddresses = new HashMap<>();
233 peerAddresses.put(followerActor.path().toString(),
234 followerActor.path().toString());
236 MockRaftActorContext actorContext =
237 (MockRaftActorContext) createActorContext(leaderActor);
238 actorContext.setPeerAddresses(peerAddresses);
240 Map<String, String> leadersSnapshot = new HashMap<>();
241 leadersSnapshot.put("1", "A");
242 leadersSnapshot.put("2", "B");
243 leadersSnapshot.put("3", "C");
246 actorContext.getReplicatedLog().removeFrom(0);
248 final int followersLastIndex = 2;
249 final int snapshotIndex = 3;
250 final int newEntryIndex = 4;
251 final int snapshotTerm = 1;
252 final int currentTerm = 2;
254 // set the snapshot variables in replicatedlog
255 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
256 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
257 actorContext.setCommitIndex(followersLastIndex);
258 //set follower timeout to 2 mins, helps during debugging
259 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
261 MockLeader leader = new MockLeader(actorContext);
264 ReplicatedLogImplEntry entry =
265 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
266 new MockRaftActorContext.MockPayload("D"));
268 //update follower timestamp
269 leader.markFollowerActive(followerActor.path().toString());
271 ByteString bs = toByteString(leadersSnapshot);
272 leader.setSnapshot(Optional.of(bs));
273 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
275 //send first chunk and no InstallSnapshotReply received yet
276 leader.getFollowerToSnapshot().getNextChunk();
277 leader.getFollowerToSnapshot().incrementChunkIndex();
279 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
280 TimeUnit.MILLISECONDS);
282 leader.handleMessage(leaderActor, new SendHeartBeat());
284 AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
285 followerActor, AppendEntries.class);
287 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
288 "received", aeproto);
290 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
292 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
294 //InstallSnapshotReply received
295 leader.getFollowerToSnapshot().markSendStatus(true);
297 leader.handleMessage(senderActor, new SendHeartBeat());
299 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
300 MessageCollectorActor.getFirstMatching(followerActor,
301 InstallSnapshot.SERIALIZABLE_CLASS);
303 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
306 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
308 assertEquals(snapshotIndex, is.getLastIncludedIndex());
314 public void testSendAppendEntriesSnapshotScenario() {
315 new JavaTestKit(getSystem()) {{
317 ActorRef followerActor = getTestActor();
319 Map<String, String> peerAddresses = new HashMap<>();
320 peerAddresses.put(followerActor.path().toString(),
321 followerActor.path().toString());
323 MockRaftActorContext actorContext =
324 (MockRaftActorContext) createActorContext(getRef());
325 actorContext.setPeerAddresses(peerAddresses);
327 Map<String, String> leadersSnapshot = new HashMap<>();
328 leadersSnapshot.put("1", "A");
329 leadersSnapshot.put("2", "B");
330 leadersSnapshot.put("3", "C");
333 actorContext.getReplicatedLog().removeFrom(0);
335 final int followersLastIndex = 2;
336 final int snapshotIndex = 3;
337 final int newEntryIndex = 4;
338 final int snapshotTerm = 1;
339 final int currentTerm = 2;
341 // set the snapshot variables in replicatedlog
342 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
343 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
344 actorContext.setCommitIndex(followersLastIndex);
346 Leader leader = new Leader(actorContext);
349 ReplicatedLogImplEntry entry =
350 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
351 new MockRaftActorContext.MockPayload("D"));
353 //update follower timestamp
354 leader.markFollowerActive(followerActor.path().toString());
356 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
357 TimeUnit.MILLISECONDS);
359 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
360 RaftActorBehavior raftBehavior = leader.handleMessage(
361 senderActor, new Replicate(null, "state-id", entry));
363 assertTrue(raftBehavior instanceof Leader);
365 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
366 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
368 protected Boolean match(Object o) throws Exception {
369 if (o instanceof InitiateInstallSnapshot) {
376 boolean initiateInitiateInstallSnapshot = false;
377 for (Boolean b: matches) {
378 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
381 assertTrue(initiateInitiateInstallSnapshot);
386 public void testInitiateInstallSnapshot() throws Exception {
387 new JavaTestKit(getSystem()) {{
389 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
391 ActorRef followerActor = getTestActor();
393 Map<String, String> peerAddresses = new HashMap<>();
394 peerAddresses.put(followerActor.path().toString(),
395 followerActor.path().toString());
398 MockRaftActorContext actorContext =
399 (MockRaftActorContext) createActorContext(leaderActor);
400 actorContext.setPeerAddresses(peerAddresses);
402 Map<String, String> leadersSnapshot = new HashMap<>();
403 leadersSnapshot.put("1", "A");
404 leadersSnapshot.put("2", "B");
405 leadersSnapshot.put("3", "C");
408 actorContext.getReplicatedLog().removeFrom(0);
410 final int followersLastIndex = 2;
411 final int snapshotIndex = 3;
412 final int newEntryIndex = 4;
413 final int snapshotTerm = 1;
414 final int currentTerm = 2;
416 // set the snapshot variables in replicatedlog
417 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
418 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
419 actorContext.setLastApplied(3);
420 actorContext.setCommitIndex(followersLastIndex);
422 Leader leader = new Leader(actorContext);
423 // set the snapshot as absent and check if capture-snapshot is invoked.
424 leader.setSnapshot(Optional.<ByteString>absent());
427 ReplicatedLogImplEntry entry =
428 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
429 new MockRaftActorContext.MockPayload("D"));
431 actorContext.getReplicatedLog().append(entry);
433 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
434 RaftActorBehavior raftBehavior = leader.handleMessage(
435 leaderActor, new InitiateInstallSnapshot());
437 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
438 getFirstMatching(leaderActor, CaptureSnapshot.class);
442 assertTrue(cs.isInstallSnapshotInitiated());
443 assertEquals(3, cs.getLastAppliedIndex());
444 assertEquals(1, cs.getLastAppliedTerm());
445 assertEquals(4, cs.getLastIndex());
446 assertEquals(2, cs.getLastTerm());
451 public void testInstallSnapshot() {
452 new JavaTestKit(getSystem()) {{
454 ActorRef followerActor = getTestActor();
456 Map<String, String> peerAddresses = new HashMap<>();
457 peerAddresses.put(followerActor.path().toString(),
458 followerActor.path().toString());
460 MockRaftActorContext actorContext =
461 (MockRaftActorContext) createActorContext();
462 actorContext.setPeerAddresses(peerAddresses);
465 Map<String, String> leadersSnapshot = new HashMap<>();
466 leadersSnapshot.put("1", "A");
467 leadersSnapshot.put("2", "B");
468 leadersSnapshot.put("3", "C");
471 actorContext.getReplicatedLog().removeFrom(0);
473 final int followersLastIndex = 2;
474 final int snapshotIndex = 3;
475 final int newEntryIndex = 4;
476 final int snapshotTerm = 1;
477 final int currentTerm = 2;
479 // set the snapshot variables in replicatedlog
480 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483 actorContext.setCommitIndex(followersLastIndex);
485 Leader leader = new Leader(actorContext);
488 ReplicatedLogImplEntry entry =
489 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
490 new MockRaftActorContext.MockPayload("D"));
492 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
493 new SendInstallSnapshot(toByteString(leadersSnapshot)));
495 assertTrue(raftBehavior instanceof Leader);
497 // check if installsnapshot gets called with the correct values.
499 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
500 // do not put code outside this method, will run afterwards
502 protected String match(Object in) {
503 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
504 InstallSnapshot is = (InstallSnapshot)
505 SerializationUtils.fromSerializable(in);
506 if (is.getData() == null) {
507 return "InstallSnapshot data is null";
509 if (is.getLastIncludedIndex() != snapshotIndex) {
510 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
512 if (is.getLastIncludedTerm() != snapshotTerm) {
513 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
515 if (is.getTerm() == currentTerm) {
516 return is.getTerm() + "!=" + currentTerm;
522 return "message mismatch:" + in.getClass();
525 }.get(); // this extracts the received message
527 assertEquals("match", out);
532 public void testHandleInstallSnapshotReplyLastChunk() {
533 new JavaTestKit(getSystem()) {{
535 ActorRef followerActor = getTestActor();
537 Map<String, String> peerAddresses = new HashMap<>();
538 peerAddresses.put(followerActor.path().toString(),
539 followerActor.path().toString());
541 final int followersLastIndex = 2;
542 final int snapshotIndex = 3;
543 final int newEntryIndex = 4;
544 final int snapshotTerm = 1;
545 final int currentTerm = 2;
547 MockRaftActorContext actorContext =
548 (MockRaftActorContext) createActorContext();
549 actorContext.setPeerAddresses(peerAddresses);
550 actorContext.setCommitIndex(followersLastIndex);
552 MockLeader leader = new MockLeader(actorContext);
554 Map<String, String> leadersSnapshot = new HashMap<>();
555 leadersSnapshot.put("1", "A");
556 leadersSnapshot.put("2", "B");
557 leadersSnapshot.put("3", "C");
559 // set the snapshot variables in replicatedlog
561 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
565 ByteString bs = toByteString(leadersSnapshot);
566 leader.setSnapshot(Optional.of(bs));
567 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
568 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
569 leader.getFollowerToSnapshot().getNextChunk();
570 leader.getFollowerToSnapshot().incrementChunkIndex();
574 actorContext.getReplicatedLog().removeFrom(0);
576 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
577 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
578 leader.getFollowerToSnapshot().getChunkIndex(), true));
580 assertTrue(raftBehavior instanceof Leader);
582 assertEquals(0, leader.followerSnapshotSize());
583 assertEquals(1, leader.followerLogSize());
584 assertNotNull(leader.getFollower(followerActor.path().toString()));
585 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
586 assertEquals(snapshotIndex, fli.getMatchIndex());
587 assertEquals(snapshotIndex, fli.getMatchIndex());
588 assertEquals(snapshotIndex + 1, fli.getNextIndex());
593 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
594 new JavaTestKit(getSystem()) {{
596 TestActorRef<MessageCollectorActor> followerActor =
597 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
599 Map<String, String> peerAddresses = new HashMap<>();
600 peerAddresses.put(followerActor.path().toString(),
601 followerActor.path().toString());
603 final int followersLastIndex = 2;
604 final int snapshotIndex = 3;
605 final int snapshotTerm = 1;
606 final int currentTerm = 2;
608 MockRaftActorContext actorContext =
609 (MockRaftActorContext) createActorContext();
611 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
613 public int getSnapshotChunkSize() {
617 actorContext.setPeerAddresses(peerAddresses);
618 actorContext.setCommitIndex(followersLastIndex);
620 MockLeader leader = new MockLeader(actorContext);
622 Map<String, String> leadersSnapshot = new HashMap<>();
623 leadersSnapshot.put("1", "A");
624 leadersSnapshot.put("2", "B");
625 leadersSnapshot.put("3", "C");
627 // set the snapshot variables in replicatedlog
628 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
629 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
630 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
632 ByteString bs = toByteString(leadersSnapshot);
633 leader.setSnapshot(Optional.of(bs));
635 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
637 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
639 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
641 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
643 assertEquals(1, installSnapshot.getChunkIndex());
644 assertEquals(3, installSnapshot.getTotalChunks());
647 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
648 followerActor.path().toString(), -1, false));
650 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
651 TimeUnit.MILLISECONDS);
653 leader.handleMessage(leaderActor, new SendHeartBeat());
655 o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
657 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
659 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
661 assertEquals(1, installSnapshot.getChunkIndex());
662 assertEquals(3, installSnapshot.getTotalChunks());
664 followerActor.tell(PoisonPill.getInstance(), getRef());
669 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
670 new JavaTestKit(getSystem()) {
673 TestActorRef<MessageCollectorActor> followerActor =
674 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
676 Map<String, String> peerAddresses = new HashMap<>();
677 peerAddresses.put(followerActor.path().toString(),
678 followerActor.path().toString());
680 final int followersLastIndex = 2;
681 final int snapshotIndex = 3;
682 final int snapshotTerm = 1;
683 final int currentTerm = 2;
685 MockRaftActorContext actorContext =
686 (MockRaftActorContext) createActorContext();
688 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
690 public int getSnapshotChunkSize() {
694 actorContext.setPeerAddresses(peerAddresses);
695 actorContext.setCommitIndex(followersLastIndex);
697 MockLeader leader = new MockLeader(actorContext);
699 Map<String, String> leadersSnapshot = new HashMap<>();
700 leadersSnapshot.put("1", "A");
701 leadersSnapshot.put("2", "B");
702 leadersSnapshot.put("3", "C");
704 // set the snapshot variables in replicatedlog
705 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
706 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
707 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
709 ByteString bs = toByteString(leadersSnapshot);
710 leader.setSnapshot(Optional.of(bs));
712 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
714 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
716 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
718 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
720 assertEquals(1, installSnapshot.getChunkIndex());
721 assertEquals(3, installSnapshot.getTotalChunks());
722 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
724 int hashCode = installSnapshot.getData().hashCode();
726 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
728 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
730 leader.handleMessage(leaderActor, new SendHeartBeat());
732 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
734 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
736 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
738 assertEquals(2, installSnapshot.getChunkIndex());
739 assertEquals(3, installSnapshot.getTotalChunks());
740 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
742 followerActor.tell(PoisonPill.getInstance(), getRef());
747 public void testFollowerToSnapshotLogic() {
749 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
751 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
753 public int getSnapshotChunkSize() {
758 MockLeader leader = new MockLeader(actorContext);
760 Map<String, String> leadersSnapshot = new HashMap<>();
761 leadersSnapshot.put("1", "A");
762 leadersSnapshot.put("2", "B");
763 leadersSnapshot.put("3", "C");
765 ByteString bs = toByteString(leadersSnapshot);
766 byte[] barray = bs.toByteArray();
768 leader.createFollowerToSnapshot("followerId", bs);
769 assertEquals(bs.size(), barray.length);
772 for (int i=0; i < barray.length; i = i + 50) {
776 if (i + 50 > barray.length) {
780 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
781 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
782 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
784 leader.getFollowerToSnapshot().markSendStatus(true);
785 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
786 leader.getFollowerToSnapshot().incrementChunkIndex();
790 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
794 @Override protected RaftActorBehavior createBehavior(
795 RaftActorContext actorContext) {
796 return new Leader(actorContext);
799 @Override protected RaftActorContext createActorContext() {
800 return createActorContext(leaderActor);
804 protected RaftActorContext createActorContext(ActorRef actorRef) {
805 return new MockRaftActorContext("test", getSystem(), actorRef);
808 private ByteString toByteString(Map<String, String> state) {
809 ByteArrayOutputStream b = null;
810 ObjectOutputStream o = null;
813 b = new ByteArrayOutputStream();
814 o = new ObjectOutputStream(b);
815 o.writeObject(state);
816 byte[] snapshotBytes = b.toByteArray();
817 return ByteString.copyFrom(snapshotBytes);
827 } catch (IOException e) {
828 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
833 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
834 private static AbstractRaftActorBehavior behavior;
836 public ForwardMessageToBehaviorActor(){
840 @Override public void onReceive(Object message) throws Exception {
841 super.onReceive(message);
842 behavior.handleMessage(sender(), message);
845 public static void setBehavior(AbstractRaftActorBehavior behavior){
846 ForwardMessageToBehaviorActor.behavior = behavior;
851 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
852 new JavaTestKit(getSystem()) {{
854 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
856 MockRaftActorContext leaderActorContext =
857 new MockRaftActorContext("leader", getSystem(), leaderActor);
859 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
861 MockRaftActorContext followerActorContext =
862 new MockRaftActorContext("follower", getSystem(), followerActor);
864 Follower follower = new Follower(followerActorContext);
866 ForwardMessageToBehaviorActor.setBehavior(follower);
868 Map<String, String> peerAddresses = new HashMap<>();
869 peerAddresses.put(followerActor.path().toString(),
870 followerActor.path().toString());
872 leaderActorContext.setPeerAddresses(peerAddresses);
874 leaderActorContext.getReplicatedLog().removeFrom(0);
877 leaderActorContext.setReplicatedLog(
878 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
880 leaderActorContext.setCommitIndex(1);
882 followerActorContext.getReplicatedLog().removeFrom(0);
884 // follower too has the exact same log entries and has the same commit index
885 followerActorContext.setReplicatedLog(
886 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
888 followerActorContext.setCommitIndex(1);
890 Leader leader = new Leader(leaderActorContext);
891 leader.markFollowerActive(followerActor.path().toString());
893 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
894 TimeUnit.MILLISECONDS);
896 leader.handleMessage(leaderActor, new SendHeartBeat());
898 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
899 .getFirstMatching(followerActor, AppendEntries.class);
901 assertNotNull(appendEntries);
903 assertEquals(1, appendEntries.getLeaderCommit());
904 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
905 assertEquals(0, appendEntries.getPrevLogIndex());
907 AppendEntriesReply appendEntriesReply =
908 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
909 leaderActor, AppendEntriesReply.class);
911 assertNotNull(appendEntriesReply);
913 // follower returns its next index
914 assertEquals(2, appendEntriesReply.getLogLastIndex());
915 assertEquals(1, appendEntriesReply.getLogLastTerm());
922 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
923 new JavaTestKit(getSystem()) {{
925 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
927 MockRaftActorContext leaderActorContext =
928 new MockRaftActorContext("leader", getSystem(), leaderActor);
930 ActorRef followerActor = getSystem().actorOf(
931 Props.create(ForwardMessageToBehaviorActor.class));
933 MockRaftActorContext followerActorContext =
934 new MockRaftActorContext("follower", getSystem(), followerActor);
936 Follower follower = new Follower(followerActorContext);
938 ForwardMessageToBehaviorActor.setBehavior(follower);
940 Map<String, String> peerAddresses = new HashMap<>();
941 peerAddresses.put(followerActor.path().toString(),
942 followerActor.path().toString());
944 leaderActorContext.setPeerAddresses(peerAddresses);
946 leaderActorContext.getReplicatedLog().removeFrom(0);
948 leaderActorContext.setReplicatedLog(
949 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
951 leaderActorContext.setCommitIndex(1);
953 followerActorContext.getReplicatedLog().removeFrom(0);
955 followerActorContext.setReplicatedLog(
956 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
958 // follower has the same log entries but its commit index > leaders commit index
959 followerActorContext.setCommitIndex(2);
961 Leader leader = new Leader(leaderActorContext);
962 leader.markFollowerActive(followerActor.path().toString());
964 Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
966 leader.handleMessage(leaderActor, new SendHeartBeat());
968 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
969 .getFirstMatching(followerActor, AppendEntries.class);
971 assertNotNull(appendEntries);
973 assertEquals(1, appendEntries.getLeaderCommit());
974 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
975 assertEquals(0, appendEntries.getPrevLogIndex());
977 AppendEntriesReply appendEntriesReply =
978 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
979 leaderActor, AppendEntriesReply.class);
981 assertNotNull(appendEntriesReply);
983 assertEquals(2, appendEntriesReply.getLogLastIndex());
984 assertEquals(1, appendEntriesReply.getLogLastTerm());
990 public void testHandleAppendEntriesReplyFailure(){
991 new JavaTestKit(getSystem()) {
994 ActorRef leaderActor =
995 getSystem().actorOf(Props.create(MessageCollectorActor.class));
997 ActorRef followerActor =
998 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1001 MockRaftActorContext leaderActorContext =
1002 new MockRaftActorContext("leader", getSystem(), leaderActor);
1004 Map<String, String> peerAddresses = new HashMap<>();
1005 peerAddresses.put("follower-1",
1006 followerActor.path().toString());
1008 leaderActorContext.setPeerAddresses(peerAddresses);
1010 Leader leader = new Leader(leaderActorContext);
1012 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1014 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1016 assertEquals(RaftState.Leader, raftActorBehavior.state());
1022 public void testHandleAppendEntriesReplySuccess() throws Exception {
1023 new JavaTestKit(getSystem()) {
1026 ActorRef leaderActor =
1027 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1029 ActorRef followerActor =
1030 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1033 MockRaftActorContext leaderActorContext =
1034 new MockRaftActorContext("leader", getSystem(), leaderActor);
1036 leaderActorContext.setReplicatedLog(
1037 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1039 Map<String, String> peerAddresses = new HashMap<>();
1040 peerAddresses.put("follower-1",
1041 followerActor.path().toString());
1043 leaderActorContext.setPeerAddresses(peerAddresses);
1044 leaderActorContext.setCommitIndex(1);
1045 leaderActorContext.setLastApplied(1);
1046 leaderActorContext.getTermInformation().update(1, "leader");
1048 Leader leader = new Leader(leaderActorContext);
1050 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1052 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1054 assertEquals(RaftState.Leader, raftActorBehavior.state());
1056 assertEquals(2, leaderActorContext.getCommitIndex());
1058 ApplyLogEntries applyLogEntries =
1059 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1060 ApplyLogEntries.class);
1062 assertNotNull(applyLogEntries);
1064 assertEquals(2, leaderActorContext.getLastApplied());
1066 assertEquals(2, applyLogEntries.getToIndex());
1068 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1071 assertEquals(1,applyStateList.size());
1073 ApplyState applyState = (ApplyState) applyStateList.get(0);
1075 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1081 public void testHandleAppendEntriesReplyUnknownFollower(){
1082 new JavaTestKit(getSystem()) {
1085 ActorRef leaderActor =
1086 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1088 MockRaftActorContext leaderActorContext =
1089 new MockRaftActorContext("leader", getSystem(), leaderActor);
1091 Leader leader = new Leader(leaderActorContext);
1093 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1095 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1097 assertEquals(RaftState.Leader, raftActorBehavior.state());
1103 public void testHandleRequestVoteReply(){
1104 new JavaTestKit(getSystem()) {
1107 ActorRef leaderActor =
1108 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1110 MockRaftActorContext leaderActorContext =
1111 new MockRaftActorContext("leader", getSystem(), leaderActor);
1113 Leader leader = new Leader(leaderActorContext);
1115 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1117 assertEquals(RaftState.Leader, raftActorBehavior.state());
1119 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1121 assertEquals(RaftState.Leader, raftActorBehavior.state());
1126 public void testIsolatedLeaderCheckNoFollowers() {
1127 new JavaTestKit(getSystem()) {{
1128 ActorRef leaderActor = getTestActor();
1130 MockRaftActorContext leaderActorContext =
1131 new MockRaftActorContext("leader", getSystem(), leaderActor);
1133 Map<String, String> peerAddresses = new HashMap<>();
1134 leaderActorContext.setPeerAddresses(peerAddresses);
1136 Leader leader = new Leader(leaderActorContext);
1137 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1138 Assert.assertTrue(behavior instanceof Leader);
1143 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1144 new JavaTestKit(getSystem()) {{
1146 ActorRef followerActor1 = getTestActor();
1147 ActorRef followerActor2 = getTestActor();
1149 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1151 Map<String, String> peerAddresses = new HashMap<>();
1152 peerAddresses.put("follower-1", followerActor1.path().toString());
1153 peerAddresses.put("follower-2", followerActor2.path().toString());
1155 leaderActorContext.setPeerAddresses(peerAddresses);
1157 Leader leader = new Leader(leaderActorContext);
1158 leader.stopIsolatedLeaderCheckSchedule();
1160 leader.markFollowerActive("follower-1");
1161 leader.markFollowerActive("follower-2");
1162 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1163 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1164 behavior instanceof Leader);
1166 // kill 1 follower and verify if that got killed
1167 final JavaTestKit probe = new JavaTestKit(getSystem());
1168 probe.watch(followerActor1);
1169 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1170 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1171 assertEquals(termMsg1.getActor(), followerActor1);
1173 leader.markFollowerInActive("follower-1");
1174 leader.markFollowerActive("follower-2");
1175 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1176 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1177 behavior instanceof Leader);
1179 // kill 2nd follower and leader should change to Isolated leader
1180 followerActor2.tell(PoisonPill.getInstance(), null);
1181 probe.watch(followerActor2);
1182 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1183 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1184 assertEquals(termMsg2.getActor(), followerActor2);
1186 leader.markFollowerInActive("follower-2");
1187 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1188 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1189 behavior instanceof IsolatedLeader);
1196 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1197 new JavaTestKit(getSystem()) {{
1199 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1201 MockRaftActorContext leaderActorContext =
1202 new MockRaftActorContext("leader", getSystem(), leaderActor);
1204 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1205 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1206 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1208 leaderActorContext.setConfigParams(configParams);
1210 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1212 MockRaftActorContext followerActorContext =
1213 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1215 followerActorContext.setConfigParams(configParams);
1217 Follower follower = new Follower(followerActorContext);
1219 ForwardMessageToBehaviorActor.setBehavior(follower);
1221 Map<String, String> peerAddresses = new HashMap<>();
1222 peerAddresses.put("follower-reply",
1223 followerActor.path().toString());
1225 leaderActorContext.setPeerAddresses(peerAddresses);
1227 leaderActorContext.getReplicatedLog().removeFrom(0);
1230 leaderActorContext.setReplicatedLog(
1231 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1233 leaderActorContext.setCommitIndex(1);
1235 Leader leader = new Leader(leaderActorContext);
1236 leader.markFollowerActive("follower-reply");
1238 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1239 TimeUnit.MILLISECONDS);
1241 leader.handleMessage(leaderActor, new SendHeartBeat());
1243 AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1244 .getFirstMatching(followerActor, AppendEntries.class);
1246 assertNotNull(appendEntries);
1248 assertEquals(1, appendEntries.getLeaderCommit());
1249 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1250 assertEquals(0, appendEntries.getPrevLogIndex());
1252 AppendEntriesReply appendEntriesReply =
1253 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1255 assertNotNull(appendEntriesReply);
1257 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1259 List<Object> entries = ForwardMessageToBehaviorActor
1260 .getAllMatching(followerActor, AppendEntries.class);
1262 assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1264 AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1266 assertEquals(1, appendEntriesSecond.getLeaderCommit());
1267 assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1268 assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1273 class MockLeader extends Leader {
1275 FollowerToSnapshot fts;
1277 public MockLeader(RaftActorContext context){
1281 public FollowerToSnapshot getFollowerToSnapshot() {
1285 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1286 fts = new FollowerToSnapshot(bs);
1287 setFollowerSnapshot(followerId, fts);
1291 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1293 private final long electionTimeOutIntervalMillis;
1294 private final int snapshotChunkSize;
1296 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1298 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1299 this.snapshotChunkSize = snapshotChunkSize;
1303 public FiniteDuration getElectionTimeOutInterval() {
1304 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1308 public int getSnapshotChunkSize() {
1309 return snapshotChunkSize;