1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import org.slf4j.impl.SimpleLogger;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
53 // This enables trace logging for the tests.
54 System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
57 private final ActorRef leaderActor =
58 getSystem().actorOf(Props.create(DoNothingActor.class));
59 private final ActorRef senderActor =
60 getSystem().actorOf(Props.create(DoNothingActor.class));
63 public void testHandleMessageForUnknownMessage() throws Exception {
64 new JavaTestKit(getSystem()) {{
66 new Leader(createActorContext());
68 // handle message should return the Leader state when it receives an
70 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
71 Assert.assertTrue(behavior instanceof Leader);
76 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
77 new JavaTestKit(getSystem()) {{
78 new Within(duration("1 seconds")) {
80 protected void run() {
81 ActorRef followerActor = getTestActor();
83 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
85 Map<String, String> peerAddresses = new HashMap<>();
87 String followerId = "follower";
88 peerAddresses.put(followerId, followerActor.path().toString());
90 actorContext.setPeerAddresses(peerAddresses);
93 actorContext.getTermInformation().update(term, "");
95 Leader leader = new Leader(actorContext);
97 // Leader should send an immediate heartbeat with no entries as follower is inactive.
98 long lastIndex = actorContext.getReplicatedLog().lastIndex();
99 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
100 assertEquals("getTerm", term, appendEntries.getTerm());
101 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103 assertEquals("Entries size", 0, appendEntries.getEntries().size());
105 // The follower would normally reply - simulate that explicitly here.
106 leader.handleMessage(followerActor, new AppendEntriesReply(
107 followerId, term, true, lastIndex - 1, term));
108 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
110 // Sleep for the heartbeat interval so AppendEntries is sent.
111 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
112 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
114 leader.handleMessage(senderActor, new SendHeartBeat());
116 appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
117 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
118 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
119 assertEquals("Entries size", 1, appendEntries.getEntries().size());
120 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
121 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
128 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
129 new JavaTestKit(getSystem()) {{
130 new Within(duration("1 seconds")) {
132 protected void run() {
133 ActorRef followerActor = getTestActor();
135 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
137 Map<String, String> peerAddresses = new HashMap<>();
139 String followerId = "follower";
140 peerAddresses.put(followerId, followerActor.path().toString());
142 actorContext.setPeerAddresses(peerAddresses);
145 actorContext.getTermInformation().update(term, "");
147 Leader leader = new Leader(actorContext);
149 // Leader will send an immediate heartbeat - ignore it.
150 expectMsgClass(duration("5 seconds"), AppendEntries.class);
152 // The follower would normally reply - simulate that explicitly here.
153 long lastIndex = actorContext.getReplicatedLog().lastIndex();
154 leader.handleMessage(followerActor, new AppendEntriesReply(
155 followerId, term, true, lastIndex, term));
156 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
158 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
160 1, lastIndex + 1, payload);
161 actorContext.getReplicatedLog().append(newEntry);
162 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
163 new Replicate(null, null, newEntry));
165 // State should not change
166 assertTrue(raftBehavior instanceof Leader);
168 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
169 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
170 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
171 assertEquals("Entries size", 1, appendEntries.getEntries().size());
172 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
173 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
174 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
181 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
182 new JavaTestKit(getSystem()) {{
183 new Within(duration("1 seconds")) {
185 protected void run() {
187 ActorRef raftActor = getTestActor();
189 MockRaftActorContext actorContext =
190 new MockRaftActorContext("test", getSystem(), raftActor);
192 actorContext.getReplicatedLog().removeFrom(0);
194 actorContext.setReplicatedLog(
195 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
198 Leader leader = new Leader(actorContext);
199 RaftActorBehavior raftBehavior = leader
200 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
202 // State should not change
203 assertTrue(raftBehavior instanceof Leader);
205 assertEquals(1, actorContext.getCommitIndex());
208 new ExpectMsg<String>(duration("1 seconds"),
210 // do not put code outside this method, will run afterwards
212 protected String match(Object in) {
213 if (in instanceof ApplyState) {
214 if (((ApplyState) in).getIdentifier().equals("state-id")) {
222 }.get(); // this extracts the received message
224 assertEquals("match", out);
232 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
233 new JavaTestKit(getSystem()) {{
234 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
236 Map<String, String> peerAddresses = new HashMap<>();
237 peerAddresses.put(followerActor.path().toString(),
238 followerActor.path().toString());
240 MockRaftActorContext actorContext =
241 (MockRaftActorContext) createActorContext(leaderActor);
242 actorContext.setPeerAddresses(peerAddresses);
244 Map<String, String> leadersSnapshot = new HashMap<>();
245 leadersSnapshot.put("1", "A");
246 leadersSnapshot.put("2", "B");
247 leadersSnapshot.put("3", "C");
250 actorContext.getReplicatedLog().removeFrom(0);
252 final int followersLastIndex = 2;
253 final int snapshotIndex = 3;
254 final int newEntryIndex = 4;
255 final int snapshotTerm = 1;
256 final int currentTerm = 2;
258 // set the snapshot variables in replicatedlog
259 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
260 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
261 actorContext.setCommitIndex(followersLastIndex);
262 //set follower timeout to 2 mins, helps during debugging
263 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
265 MockLeader leader = new MockLeader(actorContext);
268 ReplicatedLogImplEntry entry =
269 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
270 new MockRaftActorContext.MockPayload("D"));
272 //update follower timestamp
273 leader.markFollowerActive(followerActor.path().toString());
275 ByteString bs = toByteString(leadersSnapshot);
276 leader.setSnapshot(Optional.of(bs));
277 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
279 //send first chunk and no InstallSnapshotReply received yet
280 leader.getFollowerToSnapshot().getNextChunk();
281 leader.getFollowerToSnapshot().incrementChunkIndex();
283 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
284 TimeUnit.MILLISECONDS);
286 leader.handleMessage(leaderActor, new SendHeartBeat());
288 AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
289 followerActor, AppendEntries.class);
291 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
292 "received", aeproto);
294 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
296 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
298 //InstallSnapshotReply received
299 leader.getFollowerToSnapshot().markSendStatus(true);
301 leader.handleMessage(senderActor, new SendHeartBeat());
303 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
304 InstallSnapshot.SERIALIZABLE_CLASS);
306 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
309 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
311 assertEquals(snapshotIndex, is.getLastIncludedIndex());
317 public void testSendAppendEntriesSnapshotScenario() {
318 new JavaTestKit(getSystem()) {{
320 ActorRef followerActor = getTestActor();
322 Map<String, String> peerAddresses = new HashMap<>();
323 peerAddresses.put(followerActor.path().toString(),
324 followerActor.path().toString());
326 MockRaftActorContext actorContext =
327 (MockRaftActorContext) createActorContext(getRef());
328 actorContext.setPeerAddresses(peerAddresses);
330 Map<String, String> leadersSnapshot = new HashMap<>();
331 leadersSnapshot.put("1", "A");
332 leadersSnapshot.put("2", "B");
333 leadersSnapshot.put("3", "C");
336 actorContext.getReplicatedLog().removeFrom(0);
338 final int followersLastIndex = 2;
339 final int snapshotIndex = 3;
340 final int newEntryIndex = 4;
341 final int snapshotTerm = 1;
342 final int currentTerm = 2;
344 // set the snapshot variables in replicatedlog
345 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
346 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
347 actorContext.setCommitIndex(followersLastIndex);
349 Leader leader = new Leader(actorContext);
352 ReplicatedLogImplEntry entry =
353 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
354 new MockRaftActorContext.MockPayload("D"));
356 //update follower timestamp
357 leader.markFollowerActive(followerActor.path().toString());
359 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
360 TimeUnit.MILLISECONDS);
362 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
363 RaftActorBehavior raftBehavior = leader.handleMessage(
364 senderActor, new Replicate(null, "state-id", entry));
366 assertTrue(raftBehavior instanceof Leader);
368 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
369 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
371 protected Boolean match(Object o) throws Exception {
372 if (o instanceof InitiateInstallSnapshot) {
379 boolean initiateInitiateInstallSnapshot = false;
380 for (Boolean b: matches) {
381 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
384 assertTrue(initiateInitiateInstallSnapshot);
389 public void testInitiateInstallSnapshot() throws Exception {
390 new JavaTestKit(getSystem()) {{
392 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
394 ActorRef followerActor = getTestActor();
396 Map<String, String> peerAddresses = new HashMap<>();
397 peerAddresses.put(followerActor.path().toString(),
398 followerActor.path().toString());
401 MockRaftActorContext actorContext =
402 (MockRaftActorContext) createActorContext(leaderActor);
403 actorContext.setPeerAddresses(peerAddresses);
405 Map<String, String> leadersSnapshot = new HashMap<>();
406 leadersSnapshot.put("1", "A");
407 leadersSnapshot.put("2", "B");
408 leadersSnapshot.put("3", "C");
411 actorContext.getReplicatedLog().removeFrom(0);
413 final int followersLastIndex = 2;
414 final int snapshotIndex = 3;
415 final int newEntryIndex = 4;
416 final int snapshotTerm = 1;
417 final int currentTerm = 2;
419 // set the snapshot variables in replicatedlog
420 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
421 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
422 actorContext.setLastApplied(3);
423 actorContext.setCommitIndex(followersLastIndex);
425 Leader leader = new Leader(actorContext);
426 // set the snapshot as absent and check if capture-snapshot is invoked.
427 leader.setSnapshot(Optional.<ByteString>absent());
430 ReplicatedLogImplEntry entry =
431 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
432 new MockRaftActorContext.MockPayload("D"));
434 actorContext.getReplicatedLog().append(entry);
436 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
437 RaftActorBehavior raftBehavior = leader.handleMessage(
438 leaderActor, new InitiateInstallSnapshot());
440 CaptureSnapshot cs = MessageCollectorActor.
441 getFirstMatching(leaderActor, CaptureSnapshot.class);
445 assertTrue(cs.isInstallSnapshotInitiated());
446 assertEquals(3, cs.getLastAppliedIndex());
447 assertEquals(1, cs.getLastAppliedTerm());
448 assertEquals(4, cs.getLastIndex());
449 assertEquals(2, cs.getLastTerm());
451 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
452 raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
453 List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
454 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
460 public void testInstallSnapshot() {
461 new JavaTestKit(getSystem()) {{
463 ActorRef followerActor = getTestActor();
465 Map<String, String> peerAddresses = new HashMap<>();
466 peerAddresses.put(followerActor.path().toString(),
467 followerActor.path().toString());
469 MockRaftActorContext actorContext =
470 (MockRaftActorContext) createActorContext();
471 actorContext.setPeerAddresses(peerAddresses);
474 Map<String, String> leadersSnapshot = new HashMap<>();
475 leadersSnapshot.put("1", "A");
476 leadersSnapshot.put("2", "B");
477 leadersSnapshot.put("3", "C");
480 actorContext.getReplicatedLog().removeFrom(0);
482 final int followersLastIndex = 2;
483 final int snapshotIndex = 3;
484 final int newEntryIndex = 4;
485 final int snapshotTerm = 1;
486 final int currentTerm = 2;
488 // set the snapshot variables in replicatedlog
489 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
490 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
491 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
492 actorContext.setCommitIndex(followersLastIndex);
494 Leader leader = new Leader(actorContext);
496 // Ignore initial heartbeat.
497 expectMsgClass(duration("5 seconds"), AppendEntries.class);
500 ReplicatedLogImplEntry entry =
501 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
502 new MockRaftActorContext.MockPayload("D"));
504 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
505 new SendInstallSnapshot(toByteString(leadersSnapshot)));
507 assertTrue(raftBehavior instanceof Leader);
509 // check if installsnapshot gets called with the correct values.
511 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
512 // do not put code outside this method, will run afterwards
514 protected String match(Object in) {
515 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
516 InstallSnapshot is = (InstallSnapshot)
517 SerializationUtils.fromSerializable(in);
518 if (is.getData() == null) {
519 return "InstallSnapshot data is null";
521 if (is.getLastIncludedIndex() != snapshotIndex) {
522 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
524 if (is.getLastIncludedTerm() != snapshotTerm) {
525 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
527 if (is.getTerm() == currentTerm) {
528 return is.getTerm() + "!=" + currentTerm;
534 return "message mismatch:" + in.getClass();
537 }.get(); // this extracts the received message
539 assertEquals("match", out);
544 public void testHandleInstallSnapshotReplyLastChunk() {
545 new JavaTestKit(getSystem()) {{
547 ActorRef followerActor = getTestActor();
549 Map<String, String> peerAddresses = new HashMap<>();
550 peerAddresses.put(followerActor.path().toString(),
551 followerActor.path().toString());
553 final int followersLastIndex = 2;
554 final int snapshotIndex = 3;
555 final int newEntryIndex = 4;
556 final int snapshotTerm = 1;
557 final int currentTerm = 2;
559 MockRaftActorContext actorContext =
560 (MockRaftActorContext) createActorContext();
561 actorContext.setPeerAddresses(peerAddresses);
562 actorContext.setCommitIndex(followersLastIndex);
564 MockLeader leader = new MockLeader(actorContext);
566 // Ignore initial heartbeat.
567 expectMsgClass(duration("5 seconds"), AppendEntries.class);
569 Map<String, String> leadersSnapshot = new HashMap<>();
570 leadersSnapshot.put("1", "A");
571 leadersSnapshot.put("2", "B");
572 leadersSnapshot.put("3", "C");
574 // set the snapshot variables in replicatedlog
576 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
577 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
578 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
580 ByteString bs = toByteString(leadersSnapshot);
581 leader.setSnapshot(Optional.of(bs));
582 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
583 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
584 leader.getFollowerToSnapshot().getNextChunk();
585 leader.getFollowerToSnapshot().incrementChunkIndex();
589 actorContext.getReplicatedLog().removeFrom(0);
591 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
592 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
593 leader.getFollowerToSnapshot().getChunkIndex(), true));
595 assertTrue(raftBehavior instanceof Leader);
597 assertEquals(0, leader.followerSnapshotSize());
598 assertEquals(1, leader.followerLogSize());
599 assertNotNull(leader.getFollower(followerActor.path().toString()));
600 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
601 assertEquals(snapshotIndex, fli.getMatchIndex());
602 assertEquals(snapshotIndex, fli.getMatchIndex());
603 assertEquals(snapshotIndex + 1, fli.getNextIndex());
607 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
608 new JavaTestKit(getSystem()) {{
610 TestActorRef<MessageCollectorActor> followerActor =
611 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
613 Map<String, String> peerAddresses = new HashMap<>();
614 peerAddresses.put("follower-reply",
615 followerActor.path().toString());
617 final int followersLastIndex = 2;
618 final int snapshotIndex = 3;
619 final int snapshotTerm = 1;
620 final int currentTerm = 2;
622 MockRaftActorContext actorContext =
623 (MockRaftActorContext) createActorContext();
624 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
626 public int getSnapshotChunkSize() {
630 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
631 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
633 actorContext.setConfigParams(configParams);
634 actorContext.setPeerAddresses(peerAddresses);
635 actorContext.setCommitIndex(followersLastIndex);
637 MockLeader leader = new MockLeader(actorContext);
639 Map<String, String> leadersSnapshot = new HashMap<>();
640 leadersSnapshot.put("1", "A");
641 leadersSnapshot.put("2", "B");
642 leadersSnapshot.put("3", "C");
644 // set the snapshot variables in replicatedlog
645 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
646 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
647 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
649 ByteString bs = toByteString(leadersSnapshot);
650 leader.setSnapshot(Optional.of(bs));
652 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
654 List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
655 InstallSnapshotMessages.InstallSnapshot.class);
657 assertEquals(1, objectList.size());
659 Object o = objectList.get(0);
660 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
662 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
664 assertEquals(1, installSnapshot.getChunkIndex());
665 assertEquals(3, installSnapshot.getTotalChunks());
667 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
668 "follower-reply", installSnapshot.getChunkIndex(), true));
670 objectList = MessageCollectorActor.getAllMatching(followerActor,
671 InstallSnapshotMessages.InstallSnapshot.class);
673 assertEquals(2, objectList.size());
675 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
677 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
678 "follower-reply", installSnapshot.getChunkIndex(), true));
680 objectList = MessageCollectorActor.getAllMatching(followerActor,
681 InstallSnapshotMessages.InstallSnapshot.class);
683 assertEquals(3, objectList.size());
685 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
687 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
688 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
689 "follower-reply", installSnapshot.getChunkIndex(), true));
691 objectList = MessageCollectorActor.getAllMatching(followerActor,
692 InstallSnapshotMessages.InstallSnapshot.class);
694 // Count should still stay at 3
695 assertEquals(3, objectList.size());
701 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
702 new JavaTestKit(getSystem()) {{
704 TestActorRef<MessageCollectorActor> followerActor =
705 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
707 Map<String, String> peerAddresses = new HashMap<>();
708 peerAddresses.put(followerActor.path().toString(),
709 followerActor.path().toString());
711 final int followersLastIndex = 2;
712 final int snapshotIndex = 3;
713 final int snapshotTerm = 1;
714 final int currentTerm = 2;
716 MockRaftActorContext actorContext =
717 (MockRaftActorContext) createActorContext();
719 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
721 public int getSnapshotChunkSize() {
725 actorContext.setPeerAddresses(peerAddresses);
726 actorContext.setCommitIndex(followersLastIndex);
728 MockLeader leader = new MockLeader(actorContext);
730 Map<String, String> leadersSnapshot = new HashMap<>();
731 leadersSnapshot.put("1", "A");
732 leadersSnapshot.put("2", "B");
733 leadersSnapshot.put("3", "C");
735 // set the snapshot variables in replicatedlog
736 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740 ByteString bs = toByteString(leadersSnapshot);
741 leader.setSnapshot(Optional.of(bs));
743 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
745 MessageCollectorActor.getAllMatching(followerActor,
746 InstallSnapshotMessages.InstallSnapshot.class);
748 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
749 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
750 assertNotNull(installSnapshot);
752 assertEquals(1, installSnapshot.getChunkIndex());
753 assertEquals(3, installSnapshot.getTotalChunks());
755 followerActor.underlyingActor().clear();
757 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
758 followerActor.path().toString(), -1, false));
760 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
761 TimeUnit.MILLISECONDS);
763 leader.handleMessage(leaderActor, new SendHeartBeat());
765 installSnapshot = MessageCollectorActor.getFirstMatching(
766 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
767 assertNotNull(installSnapshot);
769 assertEquals(1, installSnapshot.getChunkIndex());
770 assertEquals(3, installSnapshot.getTotalChunks());
772 followerActor.tell(PoisonPill.getInstance(), getRef());
777 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
778 new JavaTestKit(getSystem()) {
780 TestActorRef<MessageCollectorActor> followerActor =
781 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
783 Map<String, String> peerAddresses = new HashMap<>();
784 peerAddresses.put(followerActor.path().toString(),
785 followerActor.path().toString());
787 final int followersLastIndex = 2;
788 final int snapshotIndex = 3;
789 final int snapshotTerm = 1;
790 final int currentTerm = 2;
792 MockRaftActorContext actorContext =
793 (MockRaftActorContext) createActorContext();
795 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
797 public int getSnapshotChunkSize() {
801 actorContext.setPeerAddresses(peerAddresses);
802 actorContext.setCommitIndex(followersLastIndex);
804 MockLeader leader = new MockLeader(actorContext);
806 Map<String, String> leadersSnapshot = new HashMap<>();
807 leadersSnapshot.put("1", "A");
808 leadersSnapshot.put("2", "B");
809 leadersSnapshot.put("3", "C");
811 // set the snapshot variables in replicatedlog
812 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
813 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
814 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
816 ByteString bs = toByteString(leadersSnapshot);
817 leader.setSnapshot(Optional.of(bs));
819 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
821 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
822 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
823 assertNotNull(installSnapshot);
825 assertEquals(1, installSnapshot.getChunkIndex());
826 assertEquals(3, installSnapshot.getTotalChunks());
827 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
829 int hashCode = installSnapshot.getData().hashCode();
831 followerActor.underlyingActor().clear();
833 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
835 installSnapshot = MessageCollectorActor.getFirstMatching(
836 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
837 assertNotNull(installSnapshot);
839 assertEquals(2, installSnapshot.getChunkIndex());
840 assertEquals(3, installSnapshot.getTotalChunks());
841 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
843 followerActor.tell(PoisonPill.getInstance(), getRef());
848 public void testFollowerToSnapshotLogic() {
850 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
852 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
854 public int getSnapshotChunkSize() {
859 MockLeader leader = new MockLeader(actorContext);
861 Map<String, String> leadersSnapshot = new HashMap<>();
862 leadersSnapshot.put("1", "A");
863 leadersSnapshot.put("2", "B");
864 leadersSnapshot.put("3", "C");
866 ByteString bs = toByteString(leadersSnapshot);
867 byte[] barray = bs.toByteArray();
869 leader.createFollowerToSnapshot("followerId", bs);
870 assertEquals(bs.size(), barray.length);
873 for (int i=0; i < barray.length; i = i + 50) {
877 if (i + 50 > barray.length) {
881 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
882 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
883 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
885 leader.getFollowerToSnapshot().markSendStatus(true);
886 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
887 leader.getFollowerToSnapshot().incrementChunkIndex();
891 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
895 @Override protected RaftActorBehavior createBehavior(
896 RaftActorContext actorContext) {
897 return new Leader(actorContext);
900 @Override protected RaftActorContext createActorContext() {
901 return createActorContext(leaderActor);
905 protected RaftActorContext createActorContext(ActorRef actorRef) {
906 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
907 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
908 configParams.setElectionTimeoutFactor(100000);
909 MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
910 context.setConfigParams(configParams);
914 private ByteString toByteString(Map<String, String> state) {
915 ByteArrayOutputStream b = null;
916 ObjectOutputStream o = null;
919 b = new ByteArrayOutputStream();
920 o = new ObjectOutputStream(b);
921 o.writeObject(state);
922 byte[] snapshotBytes = b.toByteArray();
923 return ByteString.copyFrom(snapshotBytes);
933 } catch (IOException e) {
934 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
939 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
940 AbstractRaftActorBehavior behavior;
942 @Override public void onReceive(Object message) throws Exception {
943 if(behavior != null) {
944 behavior.handleMessage(sender(), message);
947 super.onReceive(message);
950 public static Props props() {
951 return Props.create(ForwardMessageToBehaviorActor.class);
956 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
957 new JavaTestKit(getSystem()) {{
958 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
959 Props.create(ForwardMessageToBehaviorActor.class));
961 MockRaftActorContext leaderActorContext =
962 new MockRaftActorContext("leader", getSystem(), leaderActor);
964 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
965 ForwardMessageToBehaviorActor.props());
967 MockRaftActorContext followerActorContext =
968 new MockRaftActorContext("follower", getSystem(), followerActor);
970 Follower follower = new Follower(followerActorContext);
971 followerActor.underlyingActor().behavior = follower;
973 Map<String, String> peerAddresses = new HashMap<>();
974 peerAddresses.put("follower", followerActor.path().toString());
976 leaderActorContext.setPeerAddresses(peerAddresses);
978 leaderActorContext.getReplicatedLog().removeFrom(0);
981 leaderActorContext.setReplicatedLog(
982 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
984 leaderActorContext.setCommitIndex(1);
986 followerActorContext.getReplicatedLog().removeFrom(0);
988 // follower too has the exact same log entries and has the same commit index
989 followerActorContext.setReplicatedLog(
990 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
992 followerActorContext.setCommitIndex(1);
994 Leader leader = new Leader(leaderActorContext);
996 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
997 assertNotNull(appendEntries);
999 assertEquals(1, appendEntries.getLeaderCommit());
1000 assertEquals(0, appendEntries.getEntries().size());
1001 assertEquals(0, appendEntries.getPrevLogIndex());
1003 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1004 leaderActor, AppendEntriesReply.class);
1005 assertNotNull(appendEntriesReply);
1007 assertEquals(2, appendEntriesReply.getLogLastIndex());
1008 assertEquals(1, appendEntriesReply.getLogLastTerm());
1010 // follower returns its next index
1011 assertEquals(2, appendEntriesReply.getLogLastIndex());
1012 assertEquals(1, appendEntriesReply.getLogLastTerm());
1018 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1019 new JavaTestKit(getSystem()) {{
1020 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1021 Props.create(ForwardMessageToBehaviorActor.class));
1023 MockRaftActorContext leaderActorContext =
1024 new MockRaftActorContext("leader", getSystem(), leaderActor);
1026 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1027 ForwardMessageToBehaviorActor.props());
1029 MockRaftActorContext followerActorContext =
1030 new MockRaftActorContext("follower", getSystem(), followerActor);
1032 Follower follower = new Follower(followerActorContext);
1033 followerActor.underlyingActor().behavior = follower;
1035 Map<String, String> peerAddresses = new HashMap<>();
1036 peerAddresses.put("follower", followerActor.path().toString());
1038 leaderActorContext.setPeerAddresses(peerAddresses);
1040 leaderActorContext.getReplicatedLog().removeFrom(0);
1042 leaderActorContext.setReplicatedLog(
1043 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1045 leaderActorContext.setCommitIndex(1);
1047 followerActorContext.getReplicatedLog().removeFrom(0);
1049 followerActorContext.setReplicatedLog(
1050 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1052 // follower has the same log entries but its commit index > leaders commit index
1053 followerActorContext.setCommitIndex(2);
1055 Leader leader = new Leader(leaderActorContext);
1057 // Initial heartbeat
1058 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1059 assertNotNull(appendEntries);
1061 assertEquals(1, appendEntries.getLeaderCommit());
1062 assertEquals(0, appendEntries.getEntries().size());
1063 assertEquals(0, appendEntries.getPrevLogIndex());
1065 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1066 leaderActor, AppendEntriesReply.class);
1067 assertNotNull(appendEntriesReply);
1069 assertEquals(2, appendEntriesReply.getLogLastIndex());
1070 assertEquals(1, appendEntriesReply.getLogLastTerm());
1072 leaderActor.underlyingActor().behavior = leader;
1073 leader.handleMessage(followerActor, appendEntriesReply);
1075 leaderActor.underlyingActor().clear();
1076 followerActor.underlyingActor().clear();
1078 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1079 TimeUnit.MILLISECONDS);
1081 leader.handleMessage(leaderActor, new SendHeartBeat());
1083 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1084 assertNotNull(appendEntries);
1086 assertEquals(1, appendEntries.getLeaderCommit());
1087 assertEquals(0, appendEntries.getEntries().size());
1088 assertEquals(2, appendEntries.getPrevLogIndex());
1090 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1091 assertNotNull(appendEntriesReply);
1093 assertEquals(2, appendEntriesReply.getLogLastIndex());
1094 assertEquals(1, appendEntriesReply.getLogLastTerm());
1096 assertEquals(1, followerActorContext.getCommitIndex());
1101 public void testHandleAppendEntriesReplyFailure(){
1102 new JavaTestKit(getSystem()) {
1105 ActorRef leaderActor =
1106 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1108 ActorRef followerActor =
1109 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1112 MockRaftActorContext leaderActorContext =
1113 new MockRaftActorContext("leader", getSystem(), leaderActor);
1115 Map<String, String> peerAddresses = new HashMap<>();
1116 peerAddresses.put("follower-1",
1117 followerActor.path().toString());
1119 leaderActorContext.setPeerAddresses(peerAddresses);
1121 Leader leader = new Leader(leaderActorContext);
1123 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1125 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1127 assertEquals(RaftState.Leader, raftActorBehavior.state());
1133 public void testHandleAppendEntriesReplySuccess() throws Exception {
1134 new JavaTestKit(getSystem()) {
1137 ActorRef leaderActor =
1138 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1140 ActorRef followerActor =
1141 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1144 MockRaftActorContext leaderActorContext =
1145 new MockRaftActorContext("leader", getSystem(), leaderActor);
1147 leaderActorContext.setReplicatedLog(
1148 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1150 Map<String, String> peerAddresses = new HashMap<>();
1151 peerAddresses.put("follower-1",
1152 followerActor.path().toString());
1154 leaderActorContext.setPeerAddresses(peerAddresses);
1155 leaderActorContext.setCommitIndex(1);
1156 leaderActorContext.setLastApplied(1);
1157 leaderActorContext.getTermInformation().update(1, "leader");
1159 Leader leader = new Leader(leaderActorContext);
1161 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1163 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1165 assertEquals(RaftState.Leader, raftActorBehavior.state());
1167 assertEquals(2, leaderActorContext.getCommitIndex());
1169 ApplyLogEntries applyLogEntries =
1170 MessageCollectorActor.getFirstMatching(leaderActor,
1171 ApplyLogEntries.class);
1173 assertNotNull(applyLogEntries);
1175 assertEquals(2, leaderActorContext.getLastApplied());
1177 assertEquals(2, applyLogEntries.getToIndex());
1179 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1182 assertEquals(1,applyStateList.size());
1184 ApplyState applyState = (ApplyState) applyStateList.get(0);
1186 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1192 public void testHandleAppendEntriesReplyUnknownFollower(){
1193 new JavaTestKit(getSystem()) {
1196 ActorRef leaderActor =
1197 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1199 MockRaftActorContext leaderActorContext =
1200 new MockRaftActorContext("leader", getSystem(), leaderActor);
1202 Leader leader = new Leader(leaderActorContext);
1204 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1206 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1208 assertEquals(RaftState.Leader, raftActorBehavior.state());
1214 public void testHandleRequestVoteReply(){
1215 new JavaTestKit(getSystem()) {
1218 ActorRef leaderActor =
1219 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1221 MockRaftActorContext leaderActorContext =
1222 new MockRaftActorContext("leader", getSystem(), leaderActor);
1224 Leader leader = new Leader(leaderActorContext);
1226 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1228 assertEquals(RaftState.Leader, raftActorBehavior.state());
1230 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1232 assertEquals(RaftState.Leader, raftActorBehavior.state());
1237 public void testIsolatedLeaderCheckNoFollowers() {
1238 new JavaTestKit(getSystem()) {{
1239 ActorRef leaderActor = getTestActor();
1241 MockRaftActorContext leaderActorContext =
1242 new MockRaftActorContext("leader", getSystem(), leaderActor);
1244 Map<String, String> peerAddresses = new HashMap<>();
1245 leaderActorContext.setPeerAddresses(peerAddresses);
1247 Leader leader = new Leader(leaderActorContext);
1248 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1249 Assert.assertTrue(behavior instanceof Leader);
1254 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1255 new JavaTestKit(getSystem()) {{
1257 ActorRef followerActor1 = getTestActor();
1258 ActorRef followerActor2 = getTestActor();
1260 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1262 Map<String, String> peerAddresses = new HashMap<>();
1263 peerAddresses.put("follower-1", followerActor1.path().toString());
1264 peerAddresses.put("follower-2", followerActor2.path().toString());
1266 leaderActorContext.setPeerAddresses(peerAddresses);
1268 Leader leader = new Leader(leaderActorContext);
1269 leader.stopIsolatedLeaderCheckSchedule();
1271 leader.markFollowerActive("follower-1");
1272 leader.markFollowerActive("follower-2");
1273 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1274 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1275 behavior instanceof Leader);
1277 // kill 1 follower and verify if that got killed
1278 final JavaTestKit probe = new JavaTestKit(getSystem());
1279 probe.watch(followerActor1);
1280 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1281 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1282 assertEquals(termMsg1.getActor(), followerActor1);
1284 leader.markFollowerInActive("follower-1");
1285 leader.markFollowerActive("follower-2");
1286 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1287 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1288 behavior instanceof Leader);
1290 // kill 2nd follower and leader should change to Isolated leader
1291 followerActor2.tell(PoisonPill.getInstance(), null);
1292 probe.watch(followerActor2);
1293 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1294 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1295 assertEquals(termMsg2.getActor(), followerActor2);
1297 leader.markFollowerInActive("follower-2");
1298 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1299 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1300 behavior instanceof IsolatedLeader);
1307 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1308 new JavaTestKit(getSystem()) {{
1309 TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1310 Props.create(MessageCollectorActor.class));
1312 MockRaftActorContext leaderActorContext =
1313 new MockRaftActorContext("leader", getSystem(), leaderActor);
1315 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1316 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1317 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1319 leaderActorContext.setConfigParams(configParams);
1321 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1322 ForwardMessageToBehaviorActor.props());
1324 MockRaftActorContext followerActorContext =
1325 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1327 followerActorContext.setConfigParams(configParams);
1329 Follower follower = new Follower(followerActorContext);
1330 followerActor.underlyingActor().behavior = follower;
1332 Map<String, String> peerAddresses = new HashMap<>();
1333 peerAddresses.put("follower-reply",
1334 followerActor.path().toString());
1336 leaderActorContext.setPeerAddresses(peerAddresses);
1338 leaderActorContext.getReplicatedLog().removeFrom(0);
1339 leaderActorContext.setCommitIndex(-1);
1340 leaderActorContext.setLastApplied(-1);
1342 followerActorContext.getReplicatedLog().removeFrom(0);
1343 followerActorContext.setCommitIndex(-1);
1344 followerActorContext.setLastApplied(-1);
1346 Leader leader = new Leader(leaderActorContext);
1348 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1349 leaderActor, AppendEntriesReply.class);
1350 assertNotNull(appendEntriesReply);
1351 System.out.println("appendEntriesReply: "+appendEntriesReply);
1352 leader.handleMessage(followerActor, appendEntriesReply);
1354 // Clear initial heartbeat messages
1356 leaderActor.underlyingActor().clear();
1357 followerActor.underlyingActor().clear();
1360 leaderActorContext.setReplicatedLog(
1361 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1362 leaderActorContext.setCommitIndex(1);
1363 leaderActorContext.setLastApplied(1);
1365 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1366 TimeUnit.MILLISECONDS);
1368 leader.handleMessage(leaderActor, new SendHeartBeat());
1370 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1371 assertNotNull(appendEntries);
1373 // Should send first log entry
1374 assertEquals(1, appendEntries.getLeaderCommit());
1375 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1376 assertEquals(-1, appendEntries.getPrevLogIndex());
1378 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1379 assertNotNull(appendEntriesReply);
1381 assertEquals(1, appendEntriesReply.getLogLastTerm());
1382 assertEquals(0, appendEntriesReply.getLogLastIndex());
1384 followerActor.underlyingActor().clear();
1386 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1388 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1389 assertNotNull(appendEntries);
1391 // Should send second log entry
1392 assertEquals(1, appendEntries.getLeaderCommit());
1393 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1397 class MockLeader extends Leader {
1399 FollowerToSnapshot fts;
1401 public MockLeader(RaftActorContext context){
1405 public FollowerToSnapshot getFollowerToSnapshot() {
1409 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1410 fts = new FollowerToSnapshot(bs);
1411 setFollowerSnapshot(followerId, fts);
1415 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1417 private final long electionTimeOutIntervalMillis;
1418 private final int snapshotChunkSize;
1420 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1422 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1423 this.snapshotChunkSize = snapshotChunkSize;
1427 public FiniteDuration getElectionTimeOutInterval() {
1428 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1432 public int getSnapshotChunkSize() {
1433 return snapshotChunkSize;