1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51 private final ActorRef leaderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
53 private final ActorRef senderActor =
54 getSystem().actorOf(Props.create(DoNothingActor.class));
57 public void testHandleMessageForUnknownMessage() throws Exception {
58 new JavaTestKit(getSystem()) {{
60 new Leader(createActorContext());
62 // handle message should return the Leader state when it receives an
64 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65 Assert.assertTrue(behavior instanceof Leader);
70 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71 new JavaTestKit(getSystem()) {{
72 new Within(duration("1 seconds")) {
74 protected void run() {
75 ActorRef followerActor = getTestActor();
77 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
79 Map<String, String> peerAddresses = new HashMap<>();
81 String followerId = "follower";
82 peerAddresses.put(followerId, followerActor.path().toString());
84 actorContext.setPeerAddresses(peerAddresses);
87 actorContext.getTermInformation().update(term, "");
89 Leader leader = new Leader(actorContext);
91 // Leader should send an immediate heartbeat with no entries as follower is inactive.
92 long lastIndex = actorContext.getReplicatedLog().lastIndex();
93 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
94 assertEquals("getTerm", term, appendEntries.getTerm());
95 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
96 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
97 assertEquals("Entries size", 0, appendEntries.getEntries().size());
99 // The follower would normally reply - simulate that explicitly here.
100 leader.handleMessage(followerActor, new AppendEntriesReply(
101 followerId, term, true, lastIndex - 1, term));
102 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
104 // Sleep for the heartbeat interval so AppendEntries is sent.
105 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
106 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
108 leader.handleMessage(senderActor, new SendHeartBeat());
110 appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
111 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
112 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
113 assertEquals("Entries size", 1, appendEntries.getEntries().size());
114 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
115 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
122 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
123 new JavaTestKit(getSystem()) {{
124 new Within(duration("1 seconds")) {
126 protected void run() {
127 ActorRef followerActor = getTestActor();
129 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
131 Map<String, String> peerAddresses = new HashMap<>();
133 String followerId = "follower";
134 peerAddresses.put(followerId, followerActor.path().toString());
136 actorContext.setPeerAddresses(peerAddresses);
139 actorContext.getTermInformation().update(term, "");
141 Leader leader = new Leader(actorContext);
143 // Leader will send an immediate heartbeat - ignore it.
144 expectMsgClass(duration("5 seconds"), AppendEntries.class);
146 // The follower would normally reply - simulate that explicitly here.
147 long lastIndex = actorContext.getReplicatedLog().lastIndex();
148 leader.handleMessage(followerActor, new AppendEntriesReply(
149 followerId, term, true, lastIndex, term));
150 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
152 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
153 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
154 1, lastIndex + 1, payload);
155 actorContext.getReplicatedLog().append(newEntry);
156 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
157 new Replicate(null, null, newEntry));
159 // State should not change
160 assertTrue(raftBehavior instanceof Leader);
162 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
163 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
164 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
165 assertEquals("Entries size", 1, appendEntries.getEntries().size());
166 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
167 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
168 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
175 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
176 new JavaTestKit(getSystem()) {{
177 new Within(duration("1 seconds")) {
179 protected void run() {
181 ActorRef raftActor = getTestActor();
183 MockRaftActorContext actorContext =
184 new MockRaftActorContext("test", getSystem(), raftActor);
186 actorContext.getReplicatedLog().removeFrom(0);
188 actorContext.setReplicatedLog(
189 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
192 Leader leader = new Leader(actorContext);
193 RaftActorBehavior raftBehavior = leader
194 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
196 // State should not change
197 assertTrue(raftBehavior instanceof Leader);
199 assertEquals(1, actorContext.getCommitIndex());
202 new ExpectMsg<String>(duration("1 seconds"),
204 // do not put code outside this method, will run afterwards
206 protected String match(Object in) {
207 if (in instanceof ApplyState) {
208 if (((ApplyState) in).getIdentifier().equals("state-id")) {
216 }.get(); // this extracts the received message
218 assertEquals("match", out);
226 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
227 new JavaTestKit(getSystem()) {{
228 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
230 Map<String, String> peerAddresses = new HashMap<>();
231 peerAddresses.put(followerActor.path().toString(),
232 followerActor.path().toString());
234 MockRaftActorContext actorContext =
235 (MockRaftActorContext) createActorContext(leaderActor);
236 actorContext.setPeerAddresses(peerAddresses);
238 Map<String, String> leadersSnapshot = new HashMap<>();
239 leadersSnapshot.put("1", "A");
240 leadersSnapshot.put("2", "B");
241 leadersSnapshot.put("3", "C");
244 actorContext.getReplicatedLog().removeFrom(0);
246 final int followersLastIndex = 2;
247 final int snapshotIndex = 3;
248 final int newEntryIndex = 4;
249 final int snapshotTerm = 1;
250 final int currentTerm = 2;
252 // set the snapshot variables in replicatedlog
253 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
254 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
255 actorContext.setCommitIndex(followersLastIndex);
256 //set follower timeout to 2 mins, helps during debugging
257 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
259 MockLeader leader = new MockLeader(actorContext);
262 ReplicatedLogImplEntry entry =
263 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
264 new MockRaftActorContext.MockPayload("D"));
266 //update follower timestamp
267 leader.markFollowerActive(followerActor.path().toString());
269 ByteString bs = toByteString(leadersSnapshot);
270 leader.setSnapshot(Optional.of(bs));
271 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
273 //send first chunk and no InstallSnapshotReply received yet
274 leader.getFollowerToSnapshot().getNextChunk();
275 leader.getFollowerToSnapshot().incrementChunkIndex();
277 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
278 TimeUnit.MILLISECONDS);
280 leader.handleMessage(leaderActor, new SendHeartBeat());
282 AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
283 followerActor, AppendEntries.class);
285 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
286 "received", aeproto);
288 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
290 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
292 //InstallSnapshotReply received
293 leader.getFollowerToSnapshot().markSendStatus(true);
295 leader.handleMessage(senderActor, new SendHeartBeat());
297 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
298 InstallSnapshot.SERIALIZABLE_CLASS);
300 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
303 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
305 assertEquals(snapshotIndex, is.getLastIncludedIndex());
311 public void testSendAppendEntriesSnapshotScenario() {
312 new JavaTestKit(getSystem()) {{
314 ActorRef followerActor = getTestActor();
316 Map<String, String> peerAddresses = new HashMap<>();
317 peerAddresses.put(followerActor.path().toString(),
318 followerActor.path().toString());
320 MockRaftActorContext actorContext =
321 (MockRaftActorContext) createActorContext(getRef());
322 actorContext.setPeerAddresses(peerAddresses);
324 Map<String, String> leadersSnapshot = new HashMap<>();
325 leadersSnapshot.put("1", "A");
326 leadersSnapshot.put("2", "B");
327 leadersSnapshot.put("3", "C");
330 actorContext.getReplicatedLog().removeFrom(0);
332 final int followersLastIndex = 2;
333 final int snapshotIndex = 3;
334 final int newEntryIndex = 4;
335 final int snapshotTerm = 1;
336 final int currentTerm = 2;
338 // set the snapshot variables in replicatedlog
339 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
340 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
341 actorContext.setCommitIndex(followersLastIndex);
343 Leader leader = new Leader(actorContext);
346 ReplicatedLogImplEntry entry =
347 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
348 new MockRaftActorContext.MockPayload("D"));
350 //update follower timestamp
351 leader.markFollowerActive(followerActor.path().toString());
353 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
354 TimeUnit.MILLISECONDS);
356 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
357 RaftActorBehavior raftBehavior = leader.handleMessage(
358 senderActor, new Replicate(null, "state-id", entry));
360 assertTrue(raftBehavior instanceof Leader);
362 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
363 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
365 protected Boolean match(Object o) throws Exception {
366 if (o instanceof InitiateInstallSnapshot) {
373 boolean initiateInitiateInstallSnapshot = false;
374 for (Boolean b: matches) {
375 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
378 assertTrue(initiateInitiateInstallSnapshot);
383 public void testInitiateInstallSnapshot() throws Exception {
384 new JavaTestKit(getSystem()) {{
386 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
388 ActorRef followerActor = getTestActor();
390 Map<String, String> peerAddresses = new HashMap<>();
391 peerAddresses.put(followerActor.path().toString(),
392 followerActor.path().toString());
395 MockRaftActorContext actorContext =
396 (MockRaftActorContext) createActorContext(leaderActor);
397 actorContext.setPeerAddresses(peerAddresses);
399 Map<String, String> leadersSnapshot = new HashMap<>();
400 leadersSnapshot.put("1", "A");
401 leadersSnapshot.put("2", "B");
402 leadersSnapshot.put("3", "C");
405 actorContext.getReplicatedLog().removeFrom(0);
407 final int followersLastIndex = 2;
408 final int snapshotIndex = 3;
409 final int newEntryIndex = 4;
410 final int snapshotTerm = 1;
411 final int currentTerm = 2;
413 // set the snapshot variables in replicatedlog
414 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
415 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
416 actorContext.setLastApplied(3);
417 actorContext.setCommitIndex(followersLastIndex);
419 Leader leader = new Leader(actorContext);
420 // set the snapshot as absent and check if capture-snapshot is invoked.
421 leader.setSnapshot(Optional.<ByteString>absent());
424 ReplicatedLogImplEntry entry =
425 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
426 new MockRaftActorContext.MockPayload("D"));
428 actorContext.getReplicatedLog().append(entry);
430 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
431 RaftActorBehavior raftBehavior = leader.handleMessage(
432 leaderActor, new InitiateInstallSnapshot());
434 CaptureSnapshot cs = MessageCollectorActor.
435 getFirstMatching(leaderActor, CaptureSnapshot.class);
439 assertTrue(cs.isInstallSnapshotInitiated());
440 assertEquals(3, cs.getLastAppliedIndex());
441 assertEquals(1, cs.getLastAppliedTerm());
442 assertEquals(4, cs.getLastIndex());
443 assertEquals(2, cs.getLastTerm());
445 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
446 raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
447 List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
448 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
454 public void testInstallSnapshot() {
455 new JavaTestKit(getSystem()) {{
457 ActorRef followerActor = getTestActor();
459 Map<String, String> peerAddresses = new HashMap<>();
460 peerAddresses.put(followerActor.path().toString(),
461 followerActor.path().toString());
463 MockRaftActorContext actorContext =
464 (MockRaftActorContext) createActorContext();
465 actorContext.setPeerAddresses(peerAddresses);
468 Map<String, String> leadersSnapshot = new HashMap<>();
469 leadersSnapshot.put("1", "A");
470 leadersSnapshot.put("2", "B");
471 leadersSnapshot.put("3", "C");
474 actorContext.getReplicatedLog().removeFrom(0);
476 final int followersLastIndex = 2;
477 final int snapshotIndex = 3;
478 final int newEntryIndex = 4;
479 final int snapshotTerm = 1;
480 final int currentTerm = 2;
482 // set the snapshot variables in replicatedlog
483 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
484 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
485 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
486 actorContext.setCommitIndex(followersLastIndex);
488 Leader leader = new Leader(actorContext);
490 // Ignore initial heartbeat.
491 expectMsgClass(duration("5 seconds"), AppendEntries.class);
494 ReplicatedLogImplEntry entry =
495 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
496 new MockRaftActorContext.MockPayload("D"));
498 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
499 new SendInstallSnapshot(toByteString(leadersSnapshot)));
501 assertTrue(raftBehavior instanceof Leader);
503 // check if installsnapshot gets called with the correct values.
505 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
506 // do not put code outside this method, will run afterwards
508 protected String match(Object in) {
509 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
510 InstallSnapshot is = (InstallSnapshot)
511 SerializationUtils.fromSerializable(in);
512 if (is.getData() == null) {
513 return "InstallSnapshot data is null";
515 if (is.getLastIncludedIndex() != snapshotIndex) {
516 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
518 if (is.getLastIncludedTerm() != snapshotTerm) {
519 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
521 if (is.getTerm() == currentTerm) {
522 return is.getTerm() + "!=" + currentTerm;
528 return "message mismatch:" + in.getClass();
531 }.get(); // this extracts the received message
533 assertEquals("match", out);
538 public void testHandleInstallSnapshotReplyLastChunk() {
539 new JavaTestKit(getSystem()) {{
541 ActorRef followerActor = getTestActor();
543 Map<String, String> peerAddresses = new HashMap<>();
544 peerAddresses.put(followerActor.path().toString(),
545 followerActor.path().toString());
547 final int followersLastIndex = 2;
548 final int snapshotIndex = 3;
549 final int newEntryIndex = 4;
550 final int snapshotTerm = 1;
551 final int currentTerm = 2;
553 MockRaftActorContext actorContext =
554 (MockRaftActorContext) createActorContext();
555 actorContext.setPeerAddresses(peerAddresses);
556 actorContext.setCommitIndex(followersLastIndex);
558 MockLeader leader = new MockLeader(actorContext);
560 // Ignore initial heartbeat.
561 expectMsgClass(duration("5 seconds"), AppendEntries.class);
563 Map<String, String> leadersSnapshot = new HashMap<>();
564 leadersSnapshot.put("1", "A");
565 leadersSnapshot.put("2", "B");
566 leadersSnapshot.put("3", "C");
568 // set the snapshot variables in replicatedlog
570 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
574 ByteString bs = toByteString(leadersSnapshot);
575 leader.setSnapshot(Optional.of(bs));
576 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
577 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
578 leader.getFollowerToSnapshot().getNextChunk();
579 leader.getFollowerToSnapshot().incrementChunkIndex();
583 actorContext.getReplicatedLog().removeFrom(0);
585 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
586 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
587 leader.getFollowerToSnapshot().getChunkIndex(), true));
589 assertTrue(raftBehavior instanceof Leader);
591 assertEquals(0, leader.followerSnapshotSize());
592 assertEquals(1, leader.followerLogSize());
593 assertNotNull(leader.getFollower(followerActor.path().toString()));
594 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
595 assertEquals(snapshotIndex, fli.getMatchIndex());
596 assertEquals(snapshotIndex, fli.getMatchIndex());
597 assertEquals(snapshotIndex + 1, fli.getNextIndex());
601 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
602 new JavaTestKit(getSystem()) {{
604 TestActorRef<MessageCollectorActor> followerActor =
605 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
607 Map<String, String> peerAddresses = new HashMap<>();
608 peerAddresses.put("follower-reply",
609 followerActor.path().toString());
611 final int followersLastIndex = 2;
612 final int snapshotIndex = 3;
613 final int snapshotTerm = 1;
614 final int currentTerm = 2;
616 MockRaftActorContext actorContext =
617 (MockRaftActorContext) createActorContext();
618 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
620 public int getSnapshotChunkSize() {
624 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
625 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
627 actorContext.setConfigParams(configParams);
628 actorContext.setPeerAddresses(peerAddresses);
629 actorContext.setCommitIndex(followersLastIndex);
631 MockLeader leader = new MockLeader(actorContext);
633 Map<String, String> leadersSnapshot = new HashMap<>();
634 leadersSnapshot.put("1", "A");
635 leadersSnapshot.put("2", "B");
636 leadersSnapshot.put("3", "C");
638 // set the snapshot variables in replicatedlog
639 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
640 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
641 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
643 ByteString bs = toByteString(leadersSnapshot);
644 leader.setSnapshot(Optional.of(bs));
646 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
648 List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
649 InstallSnapshotMessages.InstallSnapshot.class);
651 assertEquals(1, objectList.size());
653 Object o = objectList.get(0);
654 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
656 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
658 assertEquals(1, installSnapshot.getChunkIndex());
659 assertEquals(3, installSnapshot.getTotalChunks());
661 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
662 "follower-reply", installSnapshot.getChunkIndex(), true));
664 objectList = MessageCollectorActor.getAllMatching(followerActor,
665 InstallSnapshotMessages.InstallSnapshot.class);
667 assertEquals(2, objectList.size());
669 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
671 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
672 "follower-reply", installSnapshot.getChunkIndex(), true));
674 objectList = MessageCollectorActor.getAllMatching(followerActor,
675 InstallSnapshotMessages.InstallSnapshot.class);
677 assertEquals(3, objectList.size());
679 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
681 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
682 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
683 "follower-reply", installSnapshot.getChunkIndex(), true));
685 objectList = MessageCollectorActor.getAllMatching(followerActor,
686 InstallSnapshotMessages.InstallSnapshot.class);
688 // Count should still stay at 3
689 assertEquals(3, objectList.size());
695 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
696 new JavaTestKit(getSystem()) {{
698 TestActorRef<MessageCollectorActor> followerActor =
699 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
701 Map<String, String> peerAddresses = new HashMap<>();
702 peerAddresses.put(followerActor.path().toString(),
703 followerActor.path().toString());
705 final int followersLastIndex = 2;
706 final int snapshotIndex = 3;
707 final int snapshotTerm = 1;
708 final int currentTerm = 2;
710 MockRaftActorContext actorContext =
711 (MockRaftActorContext) createActorContext();
713 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
715 public int getSnapshotChunkSize() {
719 actorContext.setPeerAddresses(peerAddresses);
720 actorContext.setCommitIndex(followersLastIndex);
722 MockLeader leader = new MockLeader(actorContext);
724 Map<String, String> leadersSnapshot = new HashMap<>();
725 leadersSnapshot.put("1", "A");
726 leadersSnapshot.put("2", "B");
727 leadersSnapshot.put("3", "C");
729 // set the snapshot variables in replicatedlog
730 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
734 ByteString bs = toByteString(leadersSnapshot);
735 leader.setSnapshot(Optional.of(bs));
737 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
739 MessageCollectorActor.getAllMatching(followerActor,
740 InstallSnapshotMessages.InstallSnapshot.class);
742 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
743 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
744 assertNotNull(installSnapshot);
746 assertEquals(1, installSnapshot.getChunkIndex());
747 assertEquals(3, installSnapshot.getTotalChunks());
749 followerActor.underlyingActor().clear();
751 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
752 followerActor.path().toString(), -1, false));
754 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
755 TimeUnit.MILLISECONDS);
757 leader.handleMessage(leaderActor, new SendHeartBeat());
759 installSnapshot = MessageCollectorActor.getFirstMatching(
760 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
761 assertNotNull(installSnapshot);
763 assertEquals(1, installSnapshot.getChunkIndex());
764 assertEquals(3, installSnapshot.getTotalChunks());
766 followerActor.tell(PoisonPill.getInstance(), getRef());
771 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
772 new JavaTestKit(getSystem()) {
774 TestActorRef<MessageCollectorActor> followerActor =
775 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
777 Map<String, String> peerAddresses = new HashMap<>();
778 peerAddresses.put(followerActor.path().toString(),
779 followerActor.path().toString());
781 final int followersLastIndex = 2;
782 final int snapshotIndex = 3;
783 final int snapshotTerm = 1;
784 final int currentTerm = 2;
786 MockRaftActorContext actorContext =
787 (MockRaftActorContext) createActorContext();
789 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
791 public int getSnapshotChunkSize() {
795 actorContext.setPeerAddresses(peerAddresses);
796 actorContext.setCommitIndex(followersLastIndex);
798 MockLeader leader = new MockLeader(actorContext);
800 Map<String, String> leadersSnapshot = new HashMap<>();
801 leadersSnapshot.put("1", "A");
802 leadersSnapshot.put("2", "B");
803 leadersSnapshot.put("3", "C");
805 // set the snapshot variables in replicatedlog
806 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
807 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
808 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
810 ByteString bs = toByteString(leadersSnapshot);
811 leader.setSnapshot(Optional.of(bs));
813 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
815 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
816 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
817 assertNotNull(installSnapshot);
819 assertEquals(1, installSnapshot.getChunkIndex());
820 assertEquals(3, installSnapshot.getTotalChunks());
821 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
823 int hashCode = installSnapshot.getData().hashCode();
825 followerActor.underlyingActor().clear();
827 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
829 installSnapshot = MessageCollectorActor.getFirstMatching(
830 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
831 assertNotNull(installSnapshot);
833 assertEquals(2, installSnapshot.getChunkIndex());
834 assertEquals(3, installSnapshot.getTotalChunks());
835 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
837 followerActor.tell(PoisonPill.getInstance(), getRef());
842 public void testFollowerToSnapshotLogic() {
844 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
846 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
848 public int getSnapshotChunkSize() {
853 MockLeader leader = new MockLeader(actorContext);
855 Map<String, String> leadersSnapshot = new HashMap<>();
856 leadersSnapshot.put("1", "A");
857 leadersSnapshot.put("2", "B");
858 leadersSnapshot.put("3", "C");
860 ByteString bs = toByteString(leadersSnapshot);
861 byte[] barray = bs.toByteArray();
863 leader.createFollowerToSnapshot("followerId", bs);
864 assertEquals(bs.size(), barray.length);
867 for (int i=0; i < barray.length; i = i + 50) {
871 if (i + 50 > barray.length) {
875 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
876 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
877 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
879 leader.getFollowerToSnapshot().markSendStatus(true);
880 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
881 leader.getFollowerToSnapshot().incrementChunkIndex();
885 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
889 @Override protected RaftActorBehavior createBehavior(
890 RaftActorContext actorContext) {
891 return new Leader(actorContext);
894 @Override protected RaftActorContext createActorContext() {
895 return createActorContext(leaderActor);
899 protected RaftActorContext createActorContext(ActorRef actorRef) {
900 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
901 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
902 configParams.setElectionTimeoutFactor(100000);
903 MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
904 context.setConfigParams(configParams);
908 private ByteString toByteString(Map<String, String> state) {
909 ByteArrayOutputStream b = null;
910 ObjectOutputStream o = null;
913 b = new ByteArrayOutputStream();
914 o = new ObjectOutputStream(b);
915 o.writeObject(state);
916 byte[] snapshotBytes = b.toByteArray();
917 return ByteString.copyFrom(snapshotBytes);
927 } catch (IOException e) {
928 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
933 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
934 AbstractRaftActorBehavior behavior;
936 @Override public void onReceive(Object message) throws Exception {
937 if(behavior != null) {
938 behavior.handleMessage(sender(), message);
941 super.onReceive(message);
944 public static Props props() {
945 return Props.create(ForwardMessageToBehaviorActor.class);
950 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
951 new JavaTestKit(getSystem()) {{
952 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
953 Props.create(ForwardMessageToBehaviorActor.class));
955 MockRaftActorContext leaderActorContext =
956 new MockRaftActorContext("leader", getSystem(), leaderActor);
958 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
959 ForwardMessageToBehaviorActor.props());
961 MockRaftActorContext followerActorContext =
962 new MockRaftActorContext("follower", getSystem(), followerActor);
964 Follower follower = new Follower(followerActorContext);
965 followerActor.underlyingActor().behavior = follower;
967 Map<String, String> peerAddresses = new HashMap<>();
968 peerAddresses.put("follower", followerActor.path().toString());
970 leaderActorContext.setPeerAddresses(peerAddresses);
972 leaderActorContext.getReplicatedLog().removeFrom(0);
975 leaderActorContext.setReplicatedLog(
976 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
978 leaderActorContext.setCommitIndex(1);
980 followerActorContext.getReplicatedLog().removeFrom(0);
982 // follower too has the exact same log entries and has the same commit index
983 followerActorContext.setReplicatedLog(
984 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
986 followerActorContext.setCommitIndex(1);
988 Leader leader = new Leader(leaderActorContext);
990 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
991 assertNotNull(appendEntries);
993 assertEquals(1, appendEntries.getLeaderCommit());
994 assertEquals(0, appendEntries.getEntries().size());
995 assertEquals(0, appendEntries.getPrevLogIndex());
997 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
998 leaderActor, AppendEntriesReply.class);
999 assertNotNull(appendEntriesReply);
1001 assertEquals(2, appendEntriesReply.getLogLastIndex());
1002 assertEquals(1, appendEntriesReply.getLogLastTerm());
1004 // follower returns its next index
1005 assertEquals(2, appendEntriesReply.getLogLastIndex());
1006 assertEquals(1, appendEntriesReply.getLogLastTerm());
1012 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1013 new JavaTestKit(getSystem()) {{
1014 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1015 Props.create(ForwardMessageToBehaviorActor.class));
1017 MockRaftActorContext leaderActorContext =
1018 new MockRaftActorContext("leader", getSystem(), leaderActor);
1020 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1021 ForwardMessageToBehaviorActor.props());
1023 MockRaftActorContext followerActorContext =
1024 new MockRaftActorContext("follower", getSystem(), followerActor);
1026 Follower follower = new Follower(followerActorContext);
1027 followerActor.underlyingActor().behavior = follower;
1029 Map<String, String> peerAddresses = new HashMap<>();
1030 peerAddresses.put("follower", followerActor.path().toString());
1032 leaderActorContext.setPeerAddresses(peerAddresses);
1034 leaderActorContext.getReplicatedLog().removeFrom(0);
1036 leaderActorContext.setReplicatedLog(
1037 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1039 leaderActorContext.setCommitIndex(1);
1041 followerActorContext.getReplicatedLog().removeFrom(0);
1043 followerActorContext.setReplicatedLog(
1044 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1046 // follower has the same log entries but its commit index > leaders commit index
1047 followerActorContext.setCommitIndex(2);
1049 Leader leader = new Leader(leaderActorContext);
1051 // Initial heartbeat
1052 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1053 assertNotNull(appendEntries);
1055 assertEquals(1, appendEntries.getLeaderCommit());
1056 assertEquals(0, appendEntries.getEntries().size());
1057 assertEquals(0, appendEntries.getPrevLogIndex());
1059 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1060 leaderActor, AppendEntriesReply.class);
1061 assertNotNull(appendEntriesReply);
1063 assertEquals(2, appendEntriesReply.getLogLastIndex());
1064 assertEquals(1, appendEntriesReply.getLogLastTerm());
1066 leaderActor.underlyingActor().behavior = leader;
1067 leader.handleMessage(followerActor, appendEntriesReply);
1069 leaderActor.underlyingActor().clear();
1070 followerActor.underlyingActor().clear();
1072 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1073 TimeUnit.MILLISECONDS);
1075 leader.handleMessage(leaderActor, new SendHeartBeat());
1077 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1078 assertNotNull(appendEntries);
1080 assertEquals(1, appendEntries.getLeaderCommit());
1081 assertEquals(0, appendEntries.getEntries().size());
1082 assertEquals(2, appendEntries.getPrevLogIndex());
1084 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1085 assertNotNull(appendEntriesReply);
1087 assertEquals(2, appendEntriesReply.getLogLastIndex());
1088 assertEquals(1, appendEntriesReply.getLogLastTerm());
1090 assertEquals(1, followerActorContext.getCommitIndex());
1095 public void testHandleAppendEntriesReplyFailure(){
1096 new JavaTestKit(getSystem()) {
1099 ActorRef leaderActor =
1100 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1102 ActorRef followerActor =
1103 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1106 MockRaftActorContext leaderActorContext =
1107 new MockRaftActorContext("leader", getSystem(), leaderActor);
1109 Map<String, String> peerAddresses = new HashMap<>();
1110 peerAddresses.put("follower-1",
1111 followerActor.path().toString());
1113 leaderActorContext.setPeerAddresses(peerAddresses);
1115 Leader leader = new Leader(leaderActorContext);
1117 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1119 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1121 assertEquals(RaftState.Leader, raftActorBehavior.state());
1127 public void testHandleAppendEntriesReplySuccess() throws Exception {
1128 new JavaTestKit(getSystem()) {
1131 ActorRef leaderActor =
1132 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1134 ActorRef followerActor =
1135 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1138 MockRaftActorContext leaderActorContext =
1139 new MockRaftActorContext("leader", getSystem(), leaderActor);
1141 leaderActorContext.setReplicatedLog(
1142 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1144 Map<String, String> peerAddresses = new HashMap<>();
1145 peerAddresses.put("follower-1",
1146 followerActor.path().toString());
1148 leaderActorContext.setPeerAddresses(peerAddresses);
1149 leaderActorContext.setCommitIndex(1);
1150 leaderActorContext.setLastApplied(1);
1151 leaderActorContext.getTermInformation().update(1, "leader");
1153 Leader leader = new Leader(leaderActorContext);
1155 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1157 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1159 assertEquals(RaftState.Leader, raftActorBehavior.state());
1161 assertEquals(2, leaderActorContext.getCommitIndex());
1163 ApplyLogEntries applyLogEntries =
1164 MessageCollectorActor.getFirstMatching(leaderActor,
1165 ApplyLogEntries.class);
1167 assertNotNull(applyLogEntries);
1169 assertEquals(2, leaderActorContext.getLastApplied());
1171 assertEquals(2, applyLogEntries.getToIndex());
1173 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1176 assertEquals(1,applyStateList.size());
1178 ApplyState applyState = (ApplyState) applyStateList.get(0);
1180 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1186 public void testHandleAppendEntriesReplyUnknownFollower(){
1187 new JavaTestKit(getSystem()) {
1190 ActorRef leaderActor =
1191 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1193 MockRaftActorContext leaderActorContext =
1194 new MockRaftActorContext("leader", getSystem(), leaderActor);
1196 Leader leader = new Leader(leaderActorContext);
1198 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1200 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1202 assertEquals(RaftState.Leader, raftActorBehavior.state());
1208 public void testHandleRequestVoteReply(){
1209 new JavaTestKit(getSystem()) {
1212 ActorRef leaderActor =
1213 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1215 MockRaftActorContext leaderActorContext =
1216 new MockRaftActorContext("leader", getSystem(), leaderActor);
1218 Leader leader = new Leader(leaderActorContext);
1220 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1222 assertEquals(RaftState.Leader, raftActorBehavior.state());
1224 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1226 assertEquals(RaftState.Leader, raftActorBehavior.state());
1231 public void testIsolatedLeaderCheckNoFollowers() {
1232 new JavaTestKit(getSystem()) {{
1233 ActorRef leaderActor = getTestActor();
1235 MockRaftActorContext leaderActorContext =
1236 new MockRaftActorContext("leader", getSystem(), leaderActor);
1238 Map<String, String> peerAddresses = new HashMap<>();
1239 leaderActorContext.setPeerAddresses(peerAddresses);
1241 Leader leader = new Leader(leaderActorContext);
1242 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1243 Assert.assertTrue(behavior instanceof Leader);
1248 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1249 new JavaTestKit(getSystem()) {{
1251 ActorRef followerActor1 = getTestActor();
1252 ActorRef followerActor2 = getTestActor();
1254 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1256 Map<String, String> peerAddresses = new HashMap<>();
1257 peerAddresses.put("follower-1", followerActor1.path().toString());
1258 peerAddresses.put("follower-2", followerActor2.path().toString());
1260 leaderActorContext.setPeerAddresses(peerAddresses);
1262 Leader leader = new Leader(leaderActorContext);
1263 leader.stopIsolatedLeaderCheckSchedule();
1265 leader.markFollowerActive("follower-1");
1266 leader.markFollowerActive("follower-2");
1267 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1268 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1269 behavior instanceof Leader);
1271 // kill 1 follower and verify if that got killed
1272 final JavaTestKit probe = new JavaTestKit(getSystem());
1273 probe.watch(followerActor1);
1274 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1275 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1276 assertEquals(termMsg1.getActor(), followerActor1);
1278 leader.markFollowerInActive("follower-1");
1279 leader.markFollowerActive("follower-2");
1280 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1281 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1282 behavior instanceof Leader);
1284 // kill 2nd follower and leader should change to Isolated leader
1285 followerActor2.tell(PoisonPill.getInstance(), null);
1286 probe.watch(followerActor2);
1287 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1288 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1289 assertEquals(termMsg2.getActor(), followerActor2);
1291 leader.markFollowerInActive("follower-2");
1292 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1293 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1294 behavior instanceof IsolatedLeader);
1301 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1302 new JavaTestKit(getSystem()) {{
1303 TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1304 Props.create(MessageCollectorActor.class));
1306 MockRaftActorContext leaderActorContext =
1307 new MockRaftActorContext("leader", getSystem(), leaderActor);
1309 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1310 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1311 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1313 leaderActorContext.setConfigParams(configParams);
1315 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1316 ForwardMessageToBehaviorActor.props());
1318 MockRaftActorContext followerActorContext =
1319 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1321 followerActorContext.setConfigParams(configParams);
1323 Follower follower = new Follower(followerActorContext);
1324 followerActor.underlyingActor().behavior = follower;
1326 Map<String, String> peerAddresses = new HashMap<>();
1327 peerAddresses.put("follower-reply",
1328 followerActor.path().toString());
1330 leaderActorContext.setPeerAddresses(peerAddresses);
1332 leaderActorContext.getReplicatedLog().removeFrom(0);
1333 leaderActorContext.setCommitIndex(-1);
1334 leaderActorContext.setLastApplied(-1);
1336 followerActorContext.getReplicatedLog().removeFrom(0);
1337 followerActorContext.setCommitIndex(-1);
1338 followerActorContext.setLastApplied(-1);
1340 Leader leader = new Leader(leaderActorContext);
1342 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1343 leaderActor, AppendEntriesReply.class);
1344 assertNotNull(appendEntriesReply);
1345 System.out.println("appendEntriesReply: "+appendEntriesReply);
1346 leader.handleMessage(followerActor, appendEntriesReply);
1348 // Clear initial heartbeat messages
1350 leaderActor.underlyingActor().clear();
1351 followerActor.underlyingActor().clear();
1354 leaderActorContext.setReplicatedLog(
1355 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1356 leaderActorContext.setCommitIndex(1);
1357 leaderActorContext.setLastApplied(1);
1359 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1360 TimeUnit.MILLISECONDS);
1362 leader.handleMessage(leaderActor, new SendHeartBeat());
1364 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1365 assertNotNull(appendEntries);
1367 // Should send first log entry
1368 assertEquals(1, appendEntries.getLeaderCommit());
1369 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1370 assertEquals(-1, appendEntries.getPrevLogIndex());
1372 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1373 assertNotNull(appendEntriesReply);
1375 assertEquals(1, appendEntriesReply.getLogLastTerm());
1376 assertEquals(0, appendEntriesReply.getLogLastIndex());
1378 followerActor.underlyingActor().clear();
1380 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1382 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1383 assertNotNull(appendEntries);
1385 // Should send second log entry
1386 assertEquals(1, appendEntries.getLeaderCommit());
1387 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1391 class MockLeader extends Leader {
1393 FollowerToSnapshot fts;
1395 public MockLeader(RaftActorContext context){
1399 public FollowerToSnapshot getFollowerToSnapshot() {
1403 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1404 fts = new FollowerToSnapshot(bs);
1405 setFollowerSnapshot(followerId, fts);
1409 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1411 private final long electionTimeOutIntervalMillis;
1412 private final int snapshotChunkSize;
1414 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1416 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1417 this.snapshotChunkSize = snapshotChunkSize;
1421 public FiniteDuration getElectionTimeOutInterval() {
1422 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1426 public int getSnapshotChunkSize() {
1427 return snapshotChunkSize;