1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
52 private ActorRef leaderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
54 private ActorRef senderActor =
55 getSystem().actorOf(Props.create(DoNothingActor.class));
58 public void testHandleMessageForUnknownMessage() throws Exception {
59 new JavaTestKit(getSystem()) {{
61 new Leader(createActorContext());
63 // handle message should return the Leader state when it receives an
65 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66 Assert.assertTrue(behavior instanceof Leader);
71 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
76 protected void run() {
78 ActorRef followerActor = getTestActor();
80 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
82 Map<String, String> peerAddresses = new HashMap<>();
84 peerAddresses.put(followerActor.path().toString(),
85 followerActor.path().toString());
87 actorContext.setPeerAddresses(peerAddresses);
89 Leader leader = new Leader(actorContext);
90 leader.handleMessage(senderActor, new SendHeartBeat());
93 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94 // do not put code outside this method, will run afterwards
96 protected String match(Object in) {
97 Object msg = fromSerializableMessage(in);
98 if (msg instanceof AppendEntries) {
99 if (((AppendEntries)msg).getTerm() == 0) {
107 }.get(); // this extracts the received message
109 assertEquals("match", out);
117 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
118 new JavaTestKit(getSystem()) {{
120 new Within(duration("1 seconds")) {
122 protected void run() {
124 ActorRef followerActor = getTestActor();
126 MockRaftActorContext actorContext =
127 (MockRaftActorContext) createActorContext();
129 Map<String, String> peerAddresses = new HashMap<>();
131 peerAddresses.put(followerActor.path().toString(),
132 followerActor.path().toString());
134 actorContext.setPeerAddresses(peerAddresses);
136 Leader leader = new Leader(actorContext);
137 RaftActorBehavior raftBehavior = leader
138 .handleMessage(senderActor, new Replicate(null, null,
139 new MockRaftActorContext.MockReplicatedLogEntry(1,
141 new MockRaftActorContext.MockPayload("foo"))
144 // State should not change
145 assertTrue(raftBehavior instanceof Leader);
148 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
149 // do not put code outside this method, will run afterwards
151 protected String match(Object in) {
152 Object msg = fromSerializableMessage(in);
153 if (msg instanceof AppendEntries) {
154 if (((AppendEntries)msg).getTerm() == 0) {
162 }.get(); // this extracts the received message
164 assertEquals("match", out);
171 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
172 new JavaTestKit(getSystem()) {{
174 new Within(duration("1 seconds")) {
176 protected void run() {
178 ActorRef raftActor = getTestActor();
180 MockRaftActorContext actorContext =
181 new MockRaftActorContext("test", getSystem(), raftActor);
183 actorContext.getReplicatedLog().removeFrom(0);
185 actorContext.setReplicatedLog(
186 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
189 Leader leader = new Leader(actorContext);
190 RaftActorBehavior raftBehavior = leader
191 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
193 // State should not change
194 assertTrue(raftBehavior instanceof Leader);
196 assertEquals(1, actorContext.getCommitIndex());
199 new ExpectMsg<String>(duration("1 seconds"),
201 // do not put code outside this method, will run afterwards
203 protected String match(Object in) {
204 if (in instanceof ApplyState) {
205 if (((ApplyState) in).getIdentifier().equals("state-id")) {
213 }.get(); // this extracts the received message
215 assertEquals("match", out);
223 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
224 new JavaTestKit(getSystem()) {{
225 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
227 Map<String, String> peerAddresses = new HashMap<>();
228 peerAddresses.put(followerActor.path().toString(),
229 followerActor.path().toString());
231 MockRaftActorContext actorContext =
232 (MockRaftActorContext) createActorContext(leaderActor);
233 actorContext.setPeerAddresses(peerAddresses);
235 Map<String, String> leadersSnapshot = new HashMap<>();
236 leadersSnapshot.put("1", "A");
237 leadersSnapshot.put("2", "B");
238 leadersSnapshot.put("3", "C");
241 actorContext.getReplicatedLog().removeFrom(0);
243 final int followersLastIndex = 2;
244 final int snapshotIndex = 3;
245 final int newEntryIndex = 4;
246 final int snapshotTerm = 1;
247 final int currentTerm = 2;
249 // set the snapshot variables in replicatedlog
250 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
252 actorContext.setCommitIndex(followersLastIndex);
253 //set follower timeout to 2 mins, helps during debugging
254 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
256 MockLeader leader = new MockLeader(actorContext);
259 ReplicatedLogImplEntry entry =
260 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261 new MockRaftActorContext.MockPayload("D"));
263 //update follower timestamp
264 leader.markFollowerActive(followerActor.path().toString());
266 ByteString bs = toByteString(leadersSnapshot);
267 leader.setSnapshot(Optional.of(bs));
268 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
270 //send first chunk and no InstallSnapshotReply received yet
271 leader.getFollowerToSnapshot().getNextChunk();
272 leader.getFollowerToSnapshot().incrementChunkIndex();
274 leader.handleMessage(leaderActor, new SendHeartBeat());
276 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
277 followerActor, AppendEntries.SERIALIZABLE_CLASS);
279 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
280 "received", aeproto);
282 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
284 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
286 //InstallSnapshotReply received
287 leader.getFollowerToSnapshot().markSendStatus(true);
289 leader.handleMessage(senderActor, new SendHeartBeat());
291 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
292 MessageCollectorActor.getFirstMatching(followerActor,
293 InstallSnapshot.SERIALIZABLE_CLASS);
295 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
298 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
300 assertEquals(snapshotIndex, is.getLastIncludedIndex());
306 public void testSendAppendEntriesSnapshotScenario() {
307 new JavaTestKit(getSystem()) {{
309 ActorRef followerActor = getTestActor();
311 Map<String, String> peerAddresses = new HashMap<>();
312 peerAddresses.put(followerActor.path().toString(),
313 followerActor.path().toString());
315 MockRaftActorContext actorContext =
316 (MockRaftActorContext) createActorContext(getRef());
317 actorContext.setPeerAddresses(peerAddresses);
319 Map<String, String> leadersSnapshot = new HashMap<>();
320 leadersSnapshot.put("1", "A");
321 leadersSnapshot.put("2", "B");
322 leadersSnapshot.put("3", "C");
325 actorContext.getReplicatedLog().removeFrom(0);
327 final int followersLastIndex = 2;
328 final int snapshotIndex = 3;
329 final int newEntryIndex = 4;
330 final int snapshotTerm = 1;
331 final int currentTerm = 2;
333 // set the snapshot variables in replicatedlog
334 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
335 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
336 actorContext.setCommitIndex(followersLastIndex);
338 Leader leader = new Leader(actorContext);
341 ReplicatedLogImplEntry entry =
342 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
343 new MockRaftActorContext.MockPayload("D"));
345 //update follower timestamp
346 leader.markFollowerActive(followerActor.path().toString());
348 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
349 RaftActorBehavior raftBehavior = leader.handleMessage(
350 senderActor, new Replicate(null, "state-id", entry));
352 assertTrue(raftBehavior instanceof Leader);
354 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
355 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
357 protected Boolean match(Object o) throws Exception {
358 if (o instanceof InitiateInstallSnapshot) {
365 boolean initiateInitiateInstallSnapshot = false;
366 for (Boolean b: matches) {
367 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
370 assertTrue(initiateInitiateInstallSnapshot);
375 public void testInitiateInstallSnapshot() throws Exception {
376 new JavaTestKit(getSystem()) {{
378 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
380 ActorRef followerActor = getTestActor();
382 Map<String, String> peerAddresses = new HashMap<>();
383 peerAddresses.put(followerActor.path().toString(),
384 followerActor.path().toString());
387 MockRaftActorContext actorContext =
388 (MockRaftActorContext) createActorContext(leaderActor);
389 actorContext.setPeerAddresses(peerAddresses);
391 Map<String, String> leadersSnapshot = new HashMap<>();
392 leadersSnapshot.put("1", "A");
393 leadersSnapshot.put("2", "B");
394 leadersSnapshot.put("3", "C");
397 actorContext.getReplicatedLog().removeFrom(0);
399 final int followersLastIndex = 2;
400 final int snapshotIndex = 3;
401 final int newEntryIndex = 4;
402 final int snapshotTerm = 1;
403 final int currentTerm = 2;
405 // set the snapshot variables in replicatedlog
406 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
407 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
408 actorContext.setLastApplied(3);
409 actorContext.setCommitIndex(followersLastIndex);
411 Leader leader = new Leader(actorContext);
412 // set the snapshot as absent and check if capture-snapshot is invoked.
413 leader.setSnapshot(Optional.<ByteString>absent());
416 ReplicatedLogImplEntry entry =
417 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
418 new MockRaftActorContext.MockPayload("D"));
420 actorContext.getReplicatedLog().append(entry);
422 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
423 RaftActorBehavior raftBehavior = leader.handleMessage(
424 leaderActor, new InitiateInstallSnapshot());
426 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
427 getFirstMatching(leaderActor, CaptureSnapshot.class);
431 assertTrue(cs.isInstallSnapshotInitiated());
432 assertEquals(3, cs.getLastAppliedIndex());
433 assertEquals(1, cs.getLastAppliedTerm());
434 assertEquals(4, cs.getLastIndex());
435 assertEquals(2, cs.getLastTerm());
440 public void testInstallSnapshot() {
441 new JavaTestKit(getSystem()) {{
443 ActorRef followerActor = getTestActor();
445 Map<String, String> peerAddresses = new HashMap<>();
446 peerAddresses.put(followerActor.path().toString(),
447 followerActor.path().toString());
449 MockRaftActorContext actorContext =
450 (MockRaftActorContext) createActorContext();
451 actorContext.setPeerAddresses(peerAddresses);
454 Map<String, String> leadersSnapshot = new HashMap<>();
455 leadersSnapshot.put("1", "A");
456 leadersSnapshot.put("2", "B");
457 leadersSnapshot.put("3", "C");
460 actorContext.getReplicatedLog().removeFrom(0);
462 final int followersLastIndex = 2;
463 final int snapshotIndex = 3;
464 final int newEntryIndex = 4;
465 final int snapshotTerm = 1;
466 final int currentTerm = 2;
468 // set the snapshot variables in replicatedlog
469 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
470 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
471 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
472 actorContext.setCommitIndex(followersLastIndex);
474 Leader leader = new Leader(actorContext);
477 ReplicatedLogImplEntry entry =
478 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
479 new MockRaftActorContext.MockPayload("D"));
481 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
482 new SendInstallSnapshot(toByteString(leadersSnapshot)));
484 assertTrue(raftBehavior instanceof Leader);
486 // check if installsnapshot gets called with the correct values.
488 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
489 // do not put code outside this method, will run afterwards
491 protected String match(Object in) {
492 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
493 InstallSnapshot is = (InstallSnapshot)
494 SerializationUtils.fromSerializable(in);
495 if (is.getData() == null) {
496 return "InstallSnapshot data is null";
498 if (is.getLastIncludedIndex() != snapshotIndex) {
499 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
501 if (is.getLastIncludedTerm() != snapshotTerm) {
502 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
504 if (is.getTerm() == currentTerm) {
505 return is.getTerm() + "!=" + currentTerm;
511 return "message mismatch:" + in.getClass();
514 }.get(); // this extracts the received message
516 assertEquals("match", out);
521 public void testHandleInstallSnapshotReplyLastChunk() {
522 new JavaTestKit(getSystem()) {{
524 ActorRef followerActor = getTestActor();
526 Map<String, String> peerAddresses = new HashMap<>();
527 peerAddresses.put(followerActor.path().toString(),
528 followerActor.path().toString());
530 final int followersLastIndex = 2;
531 final int snapshotIndex = 3;
532 final int newEntryIndex = 4;
533 final int snapshotTerm = 1;
534 final int currentTerm = 2;
536 MockRaftActorContext actorContext =
537 (MockRaftActorContext) createActorContext();
538 actorContext.setPeerAddresses(peerAddresses);
539 actorContext.setCommitIndex(followersLastIndex);
541 MockLeader leader = new MockLeader(actorContext);
543 Map<String, String> leadersSnapshot = new HashMap<>();
544 leadersSnapshot.put("1", "A");
545 leadersSnapshot.put("2", "B");
546 leadersSnapshot.put("3", "C");
548 // set the snapshot variables in replicatedlog
550 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
551 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
552 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
554 ByteString bs = toByteString(leadersSnapshot);
555 leader.setSnapshot(Optional.of(bs));
556 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
557 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
558 leader.getFollowerToSnapshot().getNextChunk();
559 leader.getFollowerToSnapshot().incrementChunkIndex();
563 actorContext.getReplicatedLog().removeFrom(0);
565 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
566 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
567 leader.getFollowerToSnapshot().getChunkIndex(), true));
569 assertTrue(raftBehavior instanceof Leader);
571 assertEquals(0, leader.followerSnapshotSize());
572 assertEquals(1, leader.followerLogSize());
573 assertNotNull(leader.getFollower(followerActor.path().toString()));
574 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
575 assertEquals(snapshotIndex, fli.getMatchIndex());
576 assertEquals(snapshotIndex, fli.getMatchIndex());
577 assertEquals(snapshotIndex + 1, fli.getNextIndex());
582 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
583 new JavaTestKit(getSystem()) {{
585 TestActorRef<MessageCollectorActor> followerActor =
586 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
588 Map<String, String> peerAddresses = new HashMap<>();
589 peerAddresses.put(followerActor.path().toString(),
590 followerActor.path().toString());
592 final int followersLastIndex = 2;
593 final int snapshotIndex = 3;
594 final int snapshotTerm = 1;
595 final int currentTerm = 2;
597 MockRaftActorContext actorContext =
598 (MockRaftActorContext) createActorContext();
600 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
602 public int getSnapshotChunkSize() {
606 actorContext.setPeerAddresses(peerAddresses);
607 actorContext.setCommitIndex(followersLastIndex);
609 MockLeader leader = new MockLeader(actorContext);
611 Map<String, String> leadersSnapshot = new HashMap<>();
612 leadersSnapshot.put("1", "A");
613 leadersSnapshot.put("2", "B");
614 leadersSnapshot.put("3", "C");
616 // set the snapshot variables in replicatedlog
617 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
618 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
619 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
621 ByteString bs = toByteString(leadersSnapshot);
622 leader.setSnapshot(Optional.of(bs));
624 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
626 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
628 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
630 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
632 assertEquals(1, installSnapshot.getChunkIndex());
633 assertEquals(3, installSnapshot.getTotalChunks());
636 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
638 leader.handleMessage(leaderActor, new SendHeartBeat());
640 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
642 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
644 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
646 assertEquals(1, installSnapshot.getChunkIndex());
647 assertEquals(3, installSnapshot.getTotalChunks());
649 followerActor.tell(PoisonPill.getInstance(), getRef());
654 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
655 new JavaTestKit(getSystem()) {
658 TestActorRef<MessageCollectorActor> followerActor =
659 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
661 Map<String, String> peerAddresses = new HashMap<>();
662 peerAddresses.put(followerActor.path().toString(),
663 followerActor.path().toString());
665 final int followersLastIndex = 2;
666 final int snapshotIndex = 3;
667 final int snapshotTerm = 1;
668 final int currentTerm = 2;
670 MockRaftActorContext actorContext =
671 (MockRaftActorContext) createActorContext();
673 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
675 public int getSnapshotChunkSize() {
679 actorContext.setPeerAddresses(peerAddresses);
680 actorContext.setCommitIndex(followersLastIndex);
682 MockLeader leader = new MockLeader(actorContext);
684 Map<String, String> leadersSnapshot = new HashMap<>();
685 leadersSnapshot.put("1", "A");
686 leadersSnapshot.put("2", "B");
687 leadersSnapshot.put("3", "C");
689 // set the snapshot variables in replicatedlog
690 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
691 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
692 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
694 ByteString bs = toByteString(leadersSnapshot);
695 leader.setSnapshot(Optional.of(bs));
697 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
699 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
701 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
703 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
705 assertEquals(1, installSnapshot.getChunkIndex());
706 assertEquals(3, installSnapshot.getTotalChunks());
707 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
709 int hashCode = installSnapshot.getData().hashCode();
711 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
713 leader.handleMessage(leaderActor, new SendHeartBeat());
715 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
717 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
719 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
721 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
723 assertEquals(2, installSnapshot.getChunkIndex());
724 assertEquals(3, installSnapshot.getTotalChunks());
725 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
727 followerActor.tell(PoisonPill.getInstance(), getRef());
732 public void testFollowerToSnapshotLogic() {
734 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
736 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
738 public int getSnapshotChunkSize() {
743 MockLeader leader = new MockLeader(actorContext);
745 Map<String, String> leadersSnapshot = new HashMap<>();
746 leadersSnapshot.put("1", "A");
747 leadersSnapshot.put("2", "B");
748 leadersSnapshot.put("3", "C");
750 ByteString bs = toByteString(leadersSnapshot);
751 byte[] barray = bs.toByteArray();
753 leader.createFollowerToSnapshot("followerId", bs);
754 assertEquals(bs.size(), barray.length);
757 for (int i=0; i < barray.length; i = i + 50) {
761 if (i + 50 > barray.length) {
765 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
766 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
767 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
769 leader.getFollowerToSnapshot().markSendStatus(true);
770 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
771 leader.getFollowerToSnapshot().incrementChunkIndex();
775 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
779 @Override protected RaftActorBehavior createBehavior(
780 RaftActorContext actorContext) {
781 return new Leader(actorContext);
784 @Override protected RaftActorContext createActorContext() {
785 return createActorContext(leaderActor);
789 protected RaftActorContext createActorContext(ActorRef actorRef) {
790 return new MockRaftActorContext("test", getSystem(), actorRef);
793 private ByteString toByteString(Map<String, String> state) {
794 ByteArrayOutputStream b = null;
795 ObjectOutputStream o = null;
798 b = new ByteArrayOutputStream();
799 o = new ObjectOutputStream(b);
800 o.writeObject(state);
801 byte[] snapshotBytes = b.toByteArray();
802 return ByteString.copyFrom(snapshotBytes);
812 } catch (IOException e) {
813 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
818 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
819 private static AbstractRaftActorBehavior behavior;
821 public ForwardMessageToBehaviorActor(){
825 @Override public void onReceive(Object message) throws Exception {
826 super.onReceive(message);
827 behavior.handleMessage(sender(), message);
830 public static void setBehavior(AbstractRaftActorBehavior behavior){
831 ForwardMessageToBehaviorActor.behavior = behavior;
836 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
837 new JavaTestKit(getSystem()) {{
839 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
841 MockRaftActorContext leaderActorContext =
842 new MockRaftActorContext("leader", getSystem(), leaderActor);
844 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
846 MockRaftActorContext followerActorContext =
847 new MockRaftActorContext("follower", getSystem(), followerActor);
849 Follower follower = new Follower(followerActorContext);
851 ForwardMessageToBehaviorActor.setBehavior(follower);
853 Map<String, String> peerAddresses = new HashMap<>();
854 peerAddresses.put(followerActor.path().toString(),
855 followerActor.path().toString());
857 leaderActorContext.setPeerAddresses(peerAddresses);
859 leaderActorContext.getReplicatedLog().removeFrom(0);
862 leaderActorContext.setReplicatedLog(
863 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
865 leaderActorContext.setCommitIndex(1);
867 followerActorContext.getReplicatedLog().removeFrom(0);
869 // follower too has the exact same log entries and has the same commit index
870 followerActorContext.setReplicatedLog(
871 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
873 followerActorContext.setCommitIndex(1);
875 Leader leader = new Leader(leaderActorContext);
876 leader.markFollowerActive(followerActor.path().toString());
878 leader.handleMessage(leaderActor, new SendHeartBeat());
880 AppendEntriesMessages.AppendEntries appendEntries =
881 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
882 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
884 assertNotNull(appendEntries);
886 assertEquals(1, appendEntries.getLeaderCommit());
887 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
888 assertEquals(0, appendEntries.getPrevLogIndex());
890 AppendEntriesReply appendEntriesReply =
891 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
892 leaderActor, AppendEntriesReply.class);
894 assertNotNull(appendEntriesReply);
896 // follower returns its next index
897 assertEquals(2, appendEntriesReply.getLogLastIndex());
898 assertEquals(1, appendEntriesReply.getLogLastTerm());
905 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
906 new JavaTestKit(getSystem()) {{
908 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
910 MockRaftActorContext leaderActorContext =
911 new MockRaftActorContext("leader", getSystem(), leaderActor);
913 ActorRef followerActor = getSystem().actorOf(
914 Props.create(ForwardMessageToBehaviorActor.class));
916 MockRaftActorContext followerActorContext =
917 new MockRaftActorContext("follower", getSystem(), followerActor);
919 Follower follower = new Follower(followerActorContext);
921 ForwardMessageToBehaviorActor.setBehavior(follower);
923 Map<String, String> peerAddresses = new HashMap<>();
924 peerAddresses.put(followerActor.path().toString(),
925 followerActor.path().toString());
927 leaderActorContext.setPeerAddresses(peerAddresses);
929 leaderActorContext.getReplicatedLog().removeFrom(0);
931 leaderActorContext.setReplicatedLog(
932 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
934 leaderActorContext.setCommitIndex(1);
936 followerActorContext.getReplicatedLog().removeFrom(0);
938 followerActorContext.setReplicatedLog(
939 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
941 // follower has the same log entries but its commit index > leaders commit index
942 followerActorContext.setCommitIndex(2);
944 Leader leader = new Leader(leaderActorContext);
945 leader.markFollowerActive(followerActor.path().toString());
947 leader.handleMessage(leaderActor, new SendHeartBeat());
949 AppendEntriesMessages.AppendEntries appendEntries =
950 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
951 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
953 assertNotNull(appendEntries);
955 assertEquals(1, appendEntries.getLeaderCommit());
956 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
957 assertEquals(0, appendEntries.getPrevLogIndex());
959 AppendEntriesReply appendEntriesReply =
960 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
961 leaderActor, AppendEntriesReply.class);
963 assertNotNull(appendEntriesReply);
965 assertEquals(2, appendEntriesReply.getLogLastIndex());
966 assertEquals(1, appendEntriesReply.getLogLastTerm());
972 public void testHandleAppendEntriesReplyFailure(){
973 new JavaTestKit(getSystem()) {
976 ActorRef leaderActor =
977 getSystem().actorOf(Props.create(MessageCollectorActor.class));
979 ActorRef followerActor =
980 getSystem().actorOf(Props.create(MessageCollectorActor.class));
983 MockRaftActorContext leaderActorContext =
984 new MockRaftActorContext("leader", getSystem(), leaderActor);
986 Map<String, String> peerAddresses = new HashMap<>();
987 peerAddresses.put("follower-1",
988 followerActor.path().toString());
990 leaderActorContext.setPeerAddresses(peerAddresses);
992 Leader leader = new Leader(leaderActorContext);
994 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
996 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
998 assertEquals(RaftState.Leader, raftActorBehavior.state());
1004 public void testHandleAppendEntriesReplySuccess() throws Exception {
1005 new JavaTestKit(getSystem()) {
1008 ActorRef leaderActor =
1009 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1011 ActorRef followerActor =
1012 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1015 MockRaftActorContext leaderActorContext =
1016 new MockRaftActorContext("leader", getSystem(), leaderActor);
1018 leaderActorContext.setReplicatedLog(
1019 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1021 Map<String, String> peerAddresses = new HashMap<>();
1022 peerAddresses.put("follower-1",
1023 followerActor.path().toString());
1025 leaderActorContext.setPeerAddresses(peerAddresses);
1026 leaderActorContext.setCommitIndex(1);
1027 leaderActorContext.setLastApplied(1);
1028 leaderActorContext.getTermInformation().update(1, "leader");
1030 Leader leader = new Leader(leaderActorContext);
1032 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1034 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1036 assertEquals(RaftState.Leader, raftActorBehavior.state());
1038 assertEquals(2, leaderActorContext.getCommitIndex());
1040 ApplyLogEntries applyLogEntries =
1041 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1042 ApplyLogEntries.class);
1044 assertNotNull(applyLogEntries);
1046 assertEquals(2, leaderActorContext.getLastApplied());
1048 assertEquals(2, applyLogEntries.getToIndex());
1050 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1053 assertEquals(1,applyStateList.size());
1055 ApplyState applyState = (ApplyState) applyStateList.get(0);
1057 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1063 public void testHandleAppendEntriesReplyUnknownFollower(){
1064 new JavaTestKit(getSystem()) {
1067 ActorRef leaderActor =
1068 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1070 MockRaftActorContext leaderActorContext =
1071 new MockRaftActorContext("leader", getSystem(), leaderActor);
1073 Leader leader = new Leader(leaderActorContext);
1075 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1077 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1079 assertEquals(RaftState.Leader, raftActorBehavior.state());
1085 public void testHandleRequestVoteReply(){
1086 new JavaTestKit(getSystem()) {
1089 ActorRef leaderActor =
1090 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1092 MockRaftActorContext leaderActorContext =
1093 new MockRaftActorContext("leader", getSystem(), leaderActor);
1095 Leader leader = new Leader(leaderActorContext);
1097 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1099 assertEquals(RaftState.Leader, raftActorBehavior.state());
1101 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1103 assertEquals(RaftState.Leader, raftActorBehavior.state());
1108 public void testIsolatedLeaderCheckNoFollowers() {
1109 new JavaTestKit(getSystem()) {{
1110 ActorRef leaderActor = getTestActor();
1112 MockRaftActorContext leaderActorContext =
1113 new MockRaftActorContext("leader", getSystem(), leaderActor);
1115 Map<String, String> peerAddresses = new HashMap<>();
1116 leaderActorContext.setPeerAddresses(peerAddresses);
1118 Leader leader = new Leader(leaderActorContext);
1119 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1120 Assert.assertTrue(behavior instanceof Leader);
1125 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1126 new JavaTestKit(getSystem()) {{
1128 ActorRef followerActor1 = getTestActor();
1129 ActorRef followerActor2 = getTestActor();
1131 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1133 Map<String, String> peerAddresses = new HashMap<>();
1134 peerAddresses.put("follower-1", followerActor1.path().toString());
1135 peerAddresses.put("follower-2", followerActor2.path().toString());
1137 leaderActorContext.setPeerAddresses(peerAddresses);
1139 Leader leader = new Leader(leaderActorContext);
1140 leader.stopIsolatedLeaderCheckSchedule();
1142 leader.markFollowerActive("follower-1");
1143 leader.markFollowerActive("follower-2");
1144 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1145 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1146 behavior instanceof Leader);
1148 // kill 1 follower and verify if that got killed
1149 final JavaTestKit probe = new JavaTestKit(getSystem());
1150 probe.watch(followerActor1);
1151 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1152 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1153 assertEquals(termMsg1.getActor(), followerActor1);
1155 leader.markFollowerInActive("follower-1");
1156 leader.markFollowerActive("follower-2");
1157 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1158 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1159 behavior instanceof Leader);
1161 // kill 2nd follower and leader should change to Isolated leader
1162 followerActor2.tell(PoisonPill.getInstance(), null);
1163 probe.watch(followerActor2);
1164 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1165 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1166 assertEquals(termMsg2.getActor(), followerActor2);
1168 leader.markFollowerInActive("follower-2");
1169 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1170 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1171 behavior instanceof IsolatedLeader);
1176 class MockLeader extends Leader {
1178 FollowerToSnapshot fts;
1180 public MockLeader(RaftActorContext context){
1184 public FollowerToSnapshot getFollowerToSnapshot() {
1188 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1189 fts = new FollowerToSnapshot(bs);
1190 setFollowerSnapshot(followerId, fts);
1194 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1196 private long electionTimeOutIntervalMillis;
1197 private int snapshotChunkSize;
1199 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1201 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1202 this.snapshotChunkSize = snapshotChunkSize;
1206 public FiniteDuration getElectionTimeOutInterval() {
1207 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1211 public int getSnapshotChunkSize() {
1212 return snapshotChunkSize;