1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import com.google.protobuf.ByteString;
12 import java.io.ByteArrayOutputStream;
13 import java.io.IOException;
14 import java.io.ObjectOutputStream;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Assert;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.RaftState;
26 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
27 import org.opendaylight.controller.cluster.raft.SerializationUtils;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
33 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
37 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
41 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotNull;
48 import static org.junit.Assert.assertTrue;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
52 private final ActorRef leaderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
54 private final ActorRef senderActor =
55 getSystem().actorOf(Props.create(DoNothingActor.class));
58 public void testHandleMessageForUnknownMessage() throws Exception {
59 new JavaTestKit(getSystem()) {{
61 new Leader(createActorContext());
63 // handle message should return the Leader state when it receives an
65 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66 Assert.assertTrue(behavior instanceof Leader);
71 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
76 protected void run() {
78 ActorRef followerActor = getTestActor();
80 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
82 Map<String, String> peerAddresses = new HashMap<>();
84 peerAddresses.put(followerActor.path().toString(),
85 followerActor.path().toString());
87 actorContext.setPeerAddresses(peerAddresses);
89 Leader leader = new Leader(actorContext);
90 leader.handleMessage(senderActor, new SendHeartBeat());
93 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94 // do not put code outside this method, will run afterwards
96 protected String match(Object in) {
97 Object msg = fromSerializableMessage(in);
98 if (msg instanceof AppendEntries) {
99 if (((AppendEntries)msg).getTerm() == 0) {
107 }.get(); // this extracts the received message
109 assertEquals("match", out);
117 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
118 new JavaTestKit(getSystem()) {{
120 new Within(duration("1 seconds")) {
122 protected void run() {
124 ActorRef followerActor = getTestActor();
126 MockRaftActorContext actorContext =
127 (MockRaftActorContext) createActorContext();
129 Map<String, String> peerAddresses = new HashMap<>();
131 peerAddresses.put(followerActor.path().toString(),
132 followerActor.path().toString());
134 actorContext.setPeerAddresses(peerAddresses);
136 Leader leader = new Leader(actorContext);
137 RaftActorBehavior raftBehavior = leader
138 .handleMessage(senderActor, new Replicate(null, null,
139 new MockRaftActorContext.MockReplicatedLogEntry(1,
141 new MockRaftActorContext.MockPayload("foo"))
144 // State should not change
145 assertTrue(raftBehavior instanceof Leader);
148 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
149 // do not put code outside this method, will run afterwards
151 protected String match(Object in) {
152 Object msg = fromSerializableMessage(in);
153 if (msg instanceof AppendEntries) {
154 if (((AppendEntries)msg).getTerm() == 0) {
162 }.get(); // this extracts the received message
164 assertEquals("match", out);
171 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
172 new JavaTestKit(getSystem()) {{
174 new Within(duration("1 seconds")) {
176 protected void run() {
178 ActorRef raftActor = getTestActor();
180 MockRaftActorContext actorContext =
181 new MockRaftActorContext("test", getSystem(), raftActor);
183 actorContext.getReplicatedLog().removeFrom(0);
185 actorContext.setReplicatedLog(
186 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
189 Leader leader = new Leader(actorContext);
190 RaftActorBehavior raftBehavior = leader
191 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
193 // State should not change
194 assertTrue(raftBehavior instanceof Leader);
196 assertEquals(1, actorContext.getCommitIndex());
199 new ExpectMsg<String>(duration("1 seconds"),
201 // do not put code outside this method, will run afterwards
203 protected String match(Object in) {
204 if (in instanceof ApplyState) {
205 if (((ApplyState) in).getIdentifier().equals("state-id")) {
213 }.get(); // this extracts the received message
215 assertEquals("match", out);
223 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
224 new JavaTestKit(getSystem()) {{
225 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
227 Map<String, String> peerAddresses = new HashMap<>();
228 peerAddresses.put(followerActor.path().toString(),
229 followerActor.path().toString());
231 MockRaftActorContext actorContext =
232 (MockRaftActorContext) createActorContext(leaderActor);
233 actorContext.setPeerAddresses(peerAddresses);
235 Map<String, String> leadersSnapshot = new HashMap<>();
236 leadersSnapshot.put("1", "A");
237 leadersSnapshot.put("2", "B");
238 leadersSnapshot.put("3", "C");
241 actorContext.getReplicatedLog().removeFrom(0);
243 final int followersLastIndex = 2;
244 final int snapshotIndex = 3;
245 final int newEntryIndex = 4;
246 final int snapshotTerm = 1;
247 final int currentTerm = 2;
249 // set the snapshot variables in replicatedlog
250 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
252 actorContext.setCommitIndex(followersLastIndex);
253 //set follower timeout to 2 mins, helps during debugging
254 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
256 MockLeader leader = new MockLeader(actorContext);
259 ReplicatedLogImplEntry entry =
260 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261 new MockRaftActorContext.MockPayload("D"));
263 //update follower timestamp
264 leader.markFollowerActive(followerActor.path().toString());
266 ByteString bs = toByteString(leadersSnapshot);
267 leader.setSnapshot(Optional.of(bs));
268 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
270 //send first chunk and no InstallSnapshotReply received yet
271 leader.getFollowerToSnapshot().getNextChunk();
272 leader.getFollowerToSnapshot().incrementChunkIndex();
274 leader.handleMessage(leaderActor, new SendHeartBeat());
276 AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
277 followerActor, AppendEntries.class);
279 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
280 "received", aeproto);
282 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
284 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
286 //InstallSnapshotReply received
287 leader.getFollowerToSnapshot().markSendStatus(true);
289 leader.handleMessage(senderActor, new SendHeartBeat());
291 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
292 MessageCollectorActor.getFirstMatching(followerActor,
293 InstallSnapshot.SERIALIZABLE_CLASS);
295 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
298 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
300 assertEquals(snapshotIndex, is.getLastIncludedIndex());
306 public void testSendAppendEntriesSnapshotScenario() {
307 new JavaTestKit(getSystem()) {{
309 ActorRef followerActor = getTestActor();
311 Map<String, String> peerAddresses = new HashMap<>();
312 peerAddresses.put(followerActor.path().toString(),
313 followerActor.path().toString());
315 MockRaftActorContext actorContext =
316 (MockRaftActorContext) createActorContext(getRef());
317 actorContext.setPeerAddresses(peerAddresses);
319 Map<String, String> leadersSnapshot = new HashMap<>();
320 leadersSnapshot.put("1", "A");
321 leadersSnapshot.put("2", "B");
322 leadersSnapshot.put("3", "C");
325 actorContext.getReplicatedLog().removeFrom(0);
327 final int followersLastIndex = 2;
328 final int snapshotIndex = 3;
329 final int newEntryIndex = 4;
330 final int snapshotTerm = 1;
331 final int currentTerm = 2;
333 // set the snapshot variables in replicatedlog
334 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
335 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
336 actorContext.setCommitIndex(followersLastIndex);
338 Leader leader = new Leader(actorContext);
341 ReplicatedLogImplEntry entry =
342 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
343 new MockRaftActorContext.MockPayload("D"));
345 //update follower timestamp
346 leader.markFollowerActive(followerActor.path().toString());
348 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
349 RaftActorBehavior raftBehavior = leader.handleMessage(
350 senderActor, new Replicate(null, "state-id", entry));
352 assertTrue(raftBehavior instanceof Leader);
354 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
355 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
357 protected Boolean match(Object o) throws Exception {
358 if (o instanceof InitiateInstallSnapshot) {
365 boolean initiateInitiateInstallSnapshot = false;
366 for (Boolean b: matches) {
367 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
370 assertTrue(initiateInitiateInstallSnapshot);
375 public void testInitiateInstallSnapshot() throws Exception {
376 new JavaTestKit(getSystem()) {{
378 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
380 ActorRef followerActor = getTestActor();
382 Map<String, String> peerAddresses = new HashMap<>();
383 peerAddresses.put(followerActor.path().toString(),
384 followerActor.path().toString());
387 MockRaftActorContext actorContext =
388 (MockRaftActorContext) createActorContext(leaderActor);
389 actorContext.setPeerAddresses(peerAddresses);
391 Map<String, String> leadersSnapshot = new HashMap<>();
392 leadersSnapshot.put("1", "A");
393 leadersSnapshot.put("2", "B");
394 leadersSnapshot.put("3", "C");
397 actorContext.getReplicatedLog().removeFrom(0);
399 final int followersLastIndex = 2;
400 final int snapshotIndex = 3;
401 final int newEntryIndex = 4;
402 final int snapshotTerm = 1;
403 final int currentTerm = 2;
405 // set the snapshot variables in replicatedlog
406 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
407 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
408 actorContext.setLastApplied(3);
409 actorContext.setCommitIndex(followersLastIndex);
411 Leader leader = new Leader(actorContext);
412 // set the snapshot as absent and check if capture-snapshot is invoked.
413 leader.setSnapshot(Optional.<ByteString>absent());
416 ReplicatedLogImplEntry entry =
417 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
418 new MockRaftActorContext.MockPayload("D"));
420 actorContext.getReplicatedLog().append(entry);
422 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
423 RaftActorBehavior raftBehavior = leader.handleMessage(
424 leaderActor, new InitiateInstallSnapshot());
426 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
427 getFirstMatching(leaderActor, CaptureSnapshot.class);
431 assertTrue(cs.isInstallSnapshotInitiated());
432 assertEquals(3, cs.getLastAppliedIndex());
433 assertEquals(1, cs.getLastAppliedTerm());
434 assertEquals(4, cs.getLastIndex());
435 assertEquals(2, cs.getLastTerm());
437 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
438 raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
439 List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
440 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
446 public void testInstallSnapshot() {
447 new JavaTestKit(getSystem()) {{
449 ActorRef followerActor = getTestActor();
451 Map<String, String> peerAddresses = new HashMap<>();
452 peerAddresses.put(followerActor.path().toString(),
453 followerActor.path().toString());
455 MockRaftActorContext actorContext =
456 (MockRaftActorContext) createActorContext();
457 actorContext.setPeerAddresses(peerAddresses);
460 Map<String, String> leadersSnapshot = new HashMap<>();
461 leadersSnapshot.put("1", "A");
462 leadersSnapshot.put("2", "B");
463 leadersSnapshot.put("3", "C");
466 actorContext.getReplicatedLog().removeFrom(0);
468 final int followersLastIndex = 2;
469 final int snapshotIndex = 3;
470 final int newEntryIndex = 4;
471 final int snapshotTerm = 1;
472 final int currentTerm = 2;
474 // set the snapshot variables in replicatedlog
475 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
476 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
477 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
478 actorContext.setCommitIndex(followersLastIndex);
480 Leader leader = new Leader(actorContext);
483 ReplicatedLogImplEntry entry =
484 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
485 new MockRaftActorContext.MockPayload("D"));
487 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
488 new SendInstallSnapshot(toByteString(leadersSnapshot)));
490 assertTrue(raftBehavior instanceof Leader);
492 // check if installsnapshot gets called with the correct values.
494 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
495 // do not put code outside this method, will run afterwards
497 protected String match(Object in) {
498 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
499 InstallSnapshot is = (InstallSnapshot)
500 SerializationUtils.fromSerializable(in);
501 if (is.getData() == null) {
502 return "InstallSnapshot data is null";
504 if (is.getLastIncludedIndex() != snapshotIndex) {
505 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
507 if (is.getLastIncludedTerm() != snapshotTerm) {
508 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
510 if (is.getTerm() == currentTerm) {
511 return is.getTerm() + "!=" + currentTerm;
517 return "message mismatch:" + in.getClass();
520 }.get(); // this extracts the received message
522 assertEquals("match", out);
527 public void testHandleInstallSnapshotReplyLastChunk() {
528 new JavaTestKit(getSystem()) {{
530 ActorRef followerActor = getTestActor();
532 Map<String, String> peerAddresses = new HashMap<>();
533 peerAddresses.put(followerActor.path().toString(),
534 followerActor.path().toString());
536 final int followersLastIndex = 2;
537 final int snapshotIndex = 3;
538 final int newEntryIndex = 4;
539 final int snapshotTerm = 1;
540 final int currentTerm = 2;
542 MockRaftActorContext actorContext =
543 (MockRaftActorContext) createActorContext();
544 actorContext.setPeerAddresses(peerAddresses);
545 actorContext.setCommitIndex(followersLastIndex);
547 MockLeader leader = new MockLeader(actorContext);
549 Map<String, String> leadersSnapshot = new HashMap<>();
550 leadersSnapshot.put("1", "A");
551 leadersSnapshot.put("2", "B");
552 leadersSnapshot.put("3", "C");
554 // set the snapshot variables in replicatedlog
556 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
557 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
558 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
560 ByteString bs = toByteString(leadersSnapshot);
561 leader.setSnapshot(Optional.of(bs));
562 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
563 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
564 leader.getFollowerToSnapshot().getNextChunk();
565 leader.getFollowerToSnapshot().incrementChunkIndex();
569 actorContext.getReplicatedLog().removeFrom(0);
571 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
572 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
573 leader.getFollowerToSnapshot().getChunkIndex(), true));
575 assertTrue(raftBehavior instanceof Leader);
577 assertEquals(0, leader.followerSnapshotSize());
578 assertEquals(1, leader.followerLogSize());
579 assertNotNull(leader.getFollower(followerActor.path().toString()));
580 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
581 assertEquals(snapshotIndex, fli.getMatchIndex());
582 assertEquals(snapshotIndex, fli.getMatchIndex());
583 assertEquals(snapshotIndex + 1, fli.getNextIndex());
588 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
589 new JavaTestKit(getSystem()) {{
591 TestActorRef<MessageCollectorActor> followerActor =
592 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
594 Map<String, String> peerAddresses = new HashMap<>();
595 peerAddresses.put(followerActor.path().toString(),
596 followerActor.path().toString());
598 final int followersLastIndex = 2;
599 final int snapshotIndex = 3;
600 final int snapshotTerm = 1;
601 final int currentTerm = 2;
603 MockRaftActorContext actorContext =
604 (MockRaftActorContext) createActorContext();
606 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
608 public int getSnapshotChunkSize() {
612 actorContext.setPeerAddresses(peerAddresses);
613 actorContext.setCommitIndex(followersLastIndex);
615 MockLeader leader = new MockLeader(actorContext);
617 Map<String, String> leadersSnapshot = new HashMap<>();
618 leadersSnapshot.put("1", "A");
619 leadersSnapshot.put("2", "B");
620 leadersSnapshot.put("3", "C");
622 // set the snapshot variables in replicatedlog
623 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
624 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
625 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
627 ByteString bs = toByteString(leadersSnapshot);
628 leader.setSnapshot(Optional.of(bs));
630 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
632 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
634 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
636 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
638 assertEquals(1, installSnapshot.getChunkIndex());
639 assertEquals(3, installSnapshot.getTotalChunks());
642 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
644 leader.handleMessage(leaderActor, new SendHeartBeat());
646 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
648 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
650 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
652 assertEquals(1, installSnapshot.getChunkIndex());
653 assertEquals(3, installSnapshot.getTotalChunks());
655 followerActor.tell(PoisonPill.getInstance(), getRef());
660 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
661 new JavaTestKit(getSystem()) {
664 TestActorRef<MessageCollectorActor> followerActor =
665 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
667 Map<String, String> peerAddresses = new HashMap<>();
668 peerAddresses.put(followerActor.path().toString(),
669 followerActor.path().toString());
671 final int followersLastIndex = 2;
672 final int snapshotIndex = 3;
673 final int snapshotTerm = 1;
674 final int currentTerm = 2;
676 MockRaftActorContext actorContext =
677 (MockRaftActorContext) createActorContext();
679 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
681 public int getSnapshotChunkSize() {
685 actorContext.setPeerAddresses(peerAddresses);
686 actorContext.setCommitIndex(followersLastIndex);
688 MockLeader leader = new MockLeader(actorContext);
690 Map<String, String> leadersSnapshot = new HashMap<>();
691 leadersSnapshot.put("1", "A");
692 leadersSnapshot.put("2", "B");
693 leadersSnapshot.put("3", "C");
695 // set the snapshot variables in replicatedlog
696 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
697 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
698 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
700 ByteString bs = toByteString(leadersSnapshot);
701 leader.setSnapshot(Optional.of(bs));
703 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
705 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
707 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
709 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
711 assertEquals(1, installSnapshot.getChunkIndex());
712 assertEquals(3, installSnapshot.getTotalChunks());
713 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
715 int hashCode = installSnapshot.getData().hashCode();
717 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
719 leader.handleMessage(leaderActor, new SendHeartBeat());
721 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
723 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
725 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
727 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
729 assertEquals(2, installSnapshot.getChunkIndex());
730 assertEquals(3, installSnapshot.getTotalChunks());
731 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
733 followerActor.tell(PoisonPill.getInstance(), getRef());
738 public void testFollowerToSnapshotLogic() {
740 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
742 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
744 public int getSnapshotChunkSize() {
749 MockLeader leader = new MockLeader(actorContext);
751 Map<String, String> leadersSnapshot = new HashMap<>();
752 leadersSnapshot.put("1", "A");
753 leadersSnapshot.put("2", "B");
754 leadersSnapshot.put("3", "C");
756 ByteString bs = toByteString(leadersSnapshot);
757 byte[] barray = bs.toByteArray();
759 leader.createFollowerToSnapshot("followerId", bs);
760 assertEquals(bs.size(), barray.length);
763 for (int i=0; i < barray.length; i = i + 50) {
767 if (i + 50 > barray.length) {
771 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
772 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
773 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
775 leader.getFollowerToSnapshot().markSendStatus(true);
776 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
777 leader.getFollowerToSnapshot().incrementChunkIndex();
781 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
785 @Override protected RaftActorBehavior createBehavior(
786 RaftActorContext actorContext) {
787 return new Leader(actorContext);
790 @Override protected RaftActorContext createActorContext() {
791 return createActorContext(leaderActor);
795 protected RaftActorContext createActorContext(ActorRef actorRef) {
796 return new MockRaftActorContext("test", getSystem(), actorRef);
799 private ByteString toByteString(Map<String, String> state) {
800 ByteArrayOutputStream b = null;
801 ObjectOutputStream o = null;
804 b = new ByteArrayOutputStream();
805 o = new ObjectOutputStream(b);
806 o.writeObject(state);
807 byte[] snapshotBytes = b.toByteArray();
808 return ByteString.copyFrom(snapshotBytes);
818 } catch (IOException e) {
819 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
824 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
825 private static AbstractRaftActorBehavior behavior;
827 public ForwardMessageToBehaviorActor(){
831 @Override public void onReceive(Object message) throws Exception {
832 super.onReceive(message);
833 behavior.handleMessage(sender(), message);
836 public static void setBehavior(AbstractRaftActorBehavior behavior){
837 ForwardMessageToBehaviorActor.behavior = behavior;
842 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
843 new JavaTestKit(getSystem()) {{
845 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
847 MockRaftActorContext leaderActorContext =
848 new MockRaftActorContext("leader", getSystem(), leaderActor);
850 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
852 MockRaftActorContext followerActorContext =
853 new MockRaftActorContext("follower", getSystem(), followerActor);
855 Follower follower = new Follower(followerActorContext);
857 ForwardMessageToBehaviorActor.setBehavior(follower);
859 Map<String, String> peerAddresses = new HashMap<>();
860 peerAddresses.put(followerActor.path().toString(),
861 followerActor.path().toString());
863 leaderActorContext.setPeerAddresses(peerAddresses);
865 leaderActorContext.getReplicatedLog().removeFrom(0);
868 leaderActorContext.setReplicatedLog(
869 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
871 leaderActorContext.setCommitIndex(1);
873 followerActorContext.getReplicatedLog().removeFrom(0);
875 // follower too has the exact same log entries and has the same commit index
876 followerActorContext.setReplicatedLog(
877 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
879 followerActorContext.setCommitIndex(1);
881 Leader leader = new Leader(leaderActorContext);
882 leader.markFollowerActive(followerActor.path().toString());
884 leader.handleMessage(leaderActor, new SendHeartBeat());
886 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
887 .getFirstMatching(followerActor, AppendEntries.class);
889 assertNotNull(appendEntries);
891 assertEquals(1, appendEntries.getLeaderCommit());
892 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
893 assertEquals(0, appendEntries.getPrevLogIndex());
895 AppendEntriesReply appendEntriesReply =
896 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
897 leaderActor, AppendEntriesReply.class);
899 assertNotNull(appendEntriesReply);
901 // follower returns its next index
902 assertEquals(2, appendEntriesReply.getLogLastIndex());
903 assertEquals(1, appendEntriesReply.getLogLastTerm());
910 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
911 new JavaTestKit(getSystem()) {{
913 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
915 MockRaftActorContext leaderActorContext =
916 new MockRaftActorContext("leader", getSystem(), leaderActor);
918 ActorRef followerActor = getSystem().actorOf(
919 Props.create(ForwardMessageToBehaviorActor.class));
921 MockRaftActorContext followerActorContext =
922 new MockRaftActorContext("follower", getSystem(), followerActor);
924 Follower follower = new Follower(followerActorContext);
926 ForwardMessageToBehaviorActor.setBehavior(follower);
928 Map<String, String> peerAddresses = new HashMap<>();
929 peerAddresses.put(followerActor.path().toString(),
930 followerActor.path().toString());
932 leaderActorContext.setPeerAddresses(peerAddresses);
934 leaderActorContext.getReplicatedLog().removeFrom(0);
936 leaderActorContext.setReplicatedLog(
937 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
939 leaderActorContext.setCommitIndex(1);
941 followerActorContext.getReplicatedLog().removeFrom(0);
943 followerActorContext.setReplicatedLog(
944 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
946 // follower has the same log entries but its commit index > leaders commit index
947 followerActorContext.setCommitIndex(2);
949 Leader leader = new Leader(leaderActorContext);
950 leader.markFollowerActive(followerActor.path().toString());
952 leader.handleMessage(leaderActor, new SendHeartBeat());
954 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
955 .getFirstMatching(followerActor, AppendEntries.class);
957 assertNotNull(appendEntries);
959 assertEquals(1, appendEntries.getLeaderCommit());
960 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
961 assertEquals(0, appendEntries.getPrevLogIndex());
963 AppendEntriesReply appendEntriesReply =
964 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
965 leaderActor, AppendEntriesReply.class);
967 assertNotNull(appendEntriesReply);
969 assertEquals(2, appendEntriesReply.getLogLastIndex());
970 assertEquals(1, appendEntriesReply.getLogLastTerm());
976 public void testHandleAppendEntriesReplyFailure(){
977 new JavaTestKit(getSystem()) {
980 ActorRef leaderActor =
981 getSystem().actorOf(Props.create(MessageCollectorActor.class));
983 ActorRef followerActor =
984 getSystem().actorOf(Props.create(MessageCollectorActor.class));
987 MockRaftActorContext leaderActorContext =
988 new MockRaftActorContext("leader", getSystem(), leaderActor);
990 Map<String, String> peerAddresses = new HashMap<>();
991 peerAddresses.put("follower-1",
992 followerActor.path().toString());
994 leaderActorContext.setPeerAddresses(peerAddresses);
996 Leader leader = new Leader(leaderActorContext);
998 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1000 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1002 assertEquals(RaftState.Leader, raftActorBehavior.state());
1008 public void testHandleAppendEntriesReplySuccess() throws Exception {
1009 new JavaTestKit(getSystem()) {
1012 ActorRef leaderActor =
1013 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1015 ActorRef followerActor =
1016 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1019 MockRaftActorContext leaderActorContext =
1020 new MockRaftActorContext("leader", getSystem(), leaderActor);
1022 leaderActorContext.setReplicatedLog(
1023 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1025 Map<String, String> peerAddresses = new HashMap<>();
1026 peerAddresses.put("follower-1",
1027 followerActor.path().toString());
1029 leaderActorContext.setPeerAddresses(peerAddresses);
1030 leaderActorContext.setCommitIndex(1);
1031 leaderActorContext.setLastApplied(1);
1032 leaderActorContext.getTermInformation().update(1, "leader");
1034 Leader leader = new Leader(leaderActorContext);
1036 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1038 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1040 assertEquals(RaftState.Leader, raftActorBehavior.state());
1042 assertEquals(2, leaderActorContext.getCommitIndex());
1044 ApplyLogEntries applyLogEntries =
1045 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1046 ApplyLogEntries.class);
1048 assertNotNull(applyLogEntries);
1050 assertEquals(2, leaderActorContext.getLastApplied());
1052 assertEquals(2, applyLogEntries.getToIndex());
1054 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1057 assertEquals(1,applyStateList.size());
1059 ApplyState applyState = (ApplyState) applyStateList.get(0);
1061 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1067 public void testHandleAppendEntriesReplyUnknownFollower(){
1068 new JavaTestKit(getSystem()) {
1071 ActorRef leaderActor =
1072 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1074 MockRaftActorContext leaderActorContext =
1075 new MockRaftActorContext("leader", getSystem(), leaderActor);
1077 Leader leader = new Leader(leaderActorContext);
1079 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1081 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1083 assertEquals(RaftState.Leader, raftActorBehavior.state());
1089 public void testHandleRequestVoteReply(){
1090 new JavaTestKit(getSystem()) {
1093 ActorRef leaderActor =
1094 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1096 MockRaftActorContext leaderActorContext =
1097 new MockRaftActorContext("leader", getSystem(), leaderActor);
1099 Leader leader = new Leader(leaderActorContext);
1101 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1103 assertEquals(RaftState.Leader, raftActorBehavior.state());
1105 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1107 assertEquals(RaftState.Leader, raftActorBehavior.state());
1112 public void testIsolatedLeaderCheckNoFollowers() {
1113 new JavaTestKit(getSystem()) {{
1114 ActorRef leaderActor = getTestActor();
1116 MockRaftActorContext leaderActorContext =
1117 new MockRaftActorContext("leader", getSystem(), leaderActor);
1119 Map<String, String> peerAddresses = new HashMap<>();
1120 leaderActorContext.setPeerAddresses(peerAddresses);
1122 Leader leader = new Leader(leaderActorContext);
1123 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1124 Assert.assertTrue(behavior instanceof Leader);
1129 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1130 new JavaTestKit(getSystem()) {{
1132 ActorRef followerActor1 = getTestActor();
1133 ActorRef followerActor2 = getTestActor();
1135 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1137 Map<String, String> peerAddresses = new HashMap<>();
1138 peerAddresses.put("follower-1", followerActor1.path().toString());
1139 peerAddresses.put("follower-2", followerActor2.path().toString());
1141 leaderActorContext.setPeerAddresses(peerAddresses);
1143 Leader leader = new Leader(leaderActorContext);
1144 leader.stopIsolatedLeaderCheckSchedule();
1146 leader.markFollowerActive("follower-1");
1147 leader.markFollowerActive("follower-2");
1148 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1149 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1150 behavior instanceof Leader);
1152 // kill 1 follower and verify if that got killed
1153 final JavaTestKit probe = new JavaTestKit(getSystem());
1154 probe.watch(followerActor1);
1155 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1156 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1157 assertEquals(termMsg1.getActor(), followerActor1);
1159 leader.markFollowerInActive("follower-1");
1160 leader.markFollowerActive("follower-2");
1161 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1162 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1163 behavior instanceof Leader);
1165 // kill 2nd follower and leader should change to Isolated leader
1166 followerActor2.tell(PoisonPill.getInstance(), null);
1167 probe.watch(followerActor2);
1168 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1169 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1170 assertEquals(termMsg2.getActor(), followerActor2);
1172 leader.markFollowerInActive("follower-2");
1173 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1174 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1175 behavior instanceof IsolatedLeader);
1180 class MockLeader extends Leader {
1182 FollowerToSnapshot fts;
1184 public MockLeader(RaftActorContext context){
1188 public FollowerToSnapshot getFollowerToSnapshot() {
1192 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1193 fts = new FollowerToSnapshot(bs);
1194 setFollowerSnapshot(followerId, fts);
1198 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1200 private final long electionTimeOutIntervalMillis;
1201 private final int snapshotChunkSize;
1203 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1205 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1206 this.snapshotChunkSize = snapshotChunkSize;
1210 public FiniteDuration getElectionTimeOutInterval() {
1211 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1215 public int getSnapshotChunkSize() {
1216 return snapshotChunkSize;