1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import com.google.protobuf.ByteString;
12 import java.io.ByteArrayOutputStream;
13 import java.io.IOException;
14 import java.io.ObjectOutputStream;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Assert;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.RaftState;
26 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
27 import org.opendaylight.controller.cluster.raft.SerializationUtils;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
33 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
37 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
41 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotNull;
48 import static org.junit.Assert.assertTrue;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
52 private final ActorRef leaderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
54 private final ActorRef senderActor =
55 getSystem().actorOf(Props.create(DoNothingActor.class));
58 public void testHandleMessageForUnknownMessage() throws Exception {
59 new JavaTestKit(getSystem()) {{
61 new Leader(createActorContext());
63 // handle message should return the Leader state when it receives an
65 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66 Assert.assertTrue(behavior instanceof Leader);
71 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
76 protected void run() {
78 ActorRef followerActor = getTestActor();
80 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
82 Map<String, String> peerAddresses = new HashMap<>();
84 peerAddresses.put(followerActor.path().toString(),
85 followerActor.path().toString());
87 actorContext.setPeerAddresses(peerAddresses);
89 Leader leader = new Leader(actorContext);
90 leader.markFollowerActive(followerActor.path().toString());
91 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
92 TimeUnit.MILLISECONDS);
93 leader.handleMessage(senderActor, new SendHeartBeat());
96 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
97 // do not put code outside this method, will run afterwards
99 protected String match(Object in) {
100 Object msg = fromSerializableMessage(in);
101 if (msg instanceof AppendEntries) {
102 if (((AppendEntries)msg).getTerm() == 0) {
110 }.get(); // this extracts the received message
112 assertEquals("match", out);
120 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
121 new JavaTestKit(getSystem()) {{
123 new Within(duration("1 seconds")) {
125 protected void run() {
127 ActorRef followerActor = getTestActor();
129 MockRaftActorContext actorContext =
130 (MockRaftActorContext) createActorContext();
132 Map<String, String> peerAddresses = new HashMap<>();
134 peerAddresses.put(followerActor.path().toString(),
135 followerActor.path().toString());
137 actorContext.setPeerAddresses(peerAddresses);
139 Leader leader = new Leader(actorContext);
140 leader.markFollowerActive(followerActor.path().toString());
141 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
142 TimeUnit.MILLISECONDS);
143 RaftActorBehavior raftBehavior = leader
144 .handleMessage(senderActor, new Replicate(null, null,
145 new MockRaftActorContext.MockReplicatedLogEntry(1,
147 new MockRaftActorContext.MockPayload("foo"))
150 // State should not change
151 assertTrue(raftBehavior instanceof Leader);
154 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
155 // do not put code outside this method, will run afterwards
157 protected String match(Object in) {
158 Object msg = fromSerializableMessage(in);
159 if (msg instanceof AppendEntries) {
160 if (((AppendEntries)msg).getTerm() == 0) {
168 }.get(); // this extracts the received message
170 assertEquals("match", out);
177 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
178 new JavaTestKit(getSystem()) {{
180 new Within(duration("1 seconds")) {
182 protected void run() {
184 ActorRef raftActor = getTestActor();
186 MockRaftActorContext actorContext =
187 new MockRaftActorContext("test", getSystem(), raftActor);
189 actorContext.getReplicatedLog().removeFrom(0);
191 actorContext.setReplicatedLog(
192 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
195 Leader leader = new Leader(actorContext);
196 RaftActorBehavior raftBehavior = leader
197 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
199 // State should not change
200 assertTrue(raftBehavior instanceof Leader);
202 assertEquals(1, actorContext.getCommitIndex());
205 new ExpectMsg<String>(duration("1 seconds"),
207 // do not put code outside this method, will run afterwards
209 protected String match(Object in) {
210 if (in instanceof ApplyState) {
211 if (((ApplyState) in).getIdentifier().equals("state-id")) {
219 }.get(); // this extracts the received message
221 assertEquals("match", out);
229 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
230 new JavaTestKit(getSystem()) {{
231 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
233 Map<String, String> peerAddresses = new HashMap<>();
234 peerAddresses.put(followerActor.path().toString(),
235 followerActor.path().toString());
237 MockRaftActorContext actorContext =
238 (MockRaftActorContext) createActorContext(leaderActor);
239 actorContext.setPeerAddresses(peerAddresses);
241 Map<String, String> leadersSnapshot = new HashMap<>();
242 leadersSnapshot.put("1", "A");
243 leadersSnapshot.put("2", "B");
244 leadersSnapshot.put("3", "C");
247 actorContext.getReplicatedLog().removeFrom(0);
249 final int followersLastIndex = 2;
250 final int snapshotIndex = 3;
251 final int newEntryIndex = 4;
252 final int snapshotTerm = 1;
253 final int currentTerm = 2;
255 // set the snapshot variables in replicatedlog
256 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
257 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
258 actorContext.setCommitIndex(followersLastIndex);
259 //set follower timeout to 2 mins, helps during debugging
260 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
262 MockLeader leader = new MockLeader(actorContext);
265 ReplicatedLogImplEntry entry =
266 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
267 new MockRaftActorContext.MockPayload("D"));
269 //update follower timestamp
270 leader.markFollowerActive(followerActor.path().toString());
272 ByteString bs = toByteString(leadersSnapshot);
273 leader.setSnapshot(Optional.of(bs));
274 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
276 //send first chunk and no InstallSnapshotReply received yet
277 leader.getFollowerToSnapshot().getNextChunk();
278 leader.getFollowerToSnapshot().incrementChunkIndex();
280 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
281 TimeUnit.MILLISECONDS);
283 leader.handleMessage(leaderActor, new SendHeartBeat());
285 AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
286 followerActor, AppendEntries.class);
288 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
289 "received", aeproto);
291 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
293 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
295 //InstallSnapshotReply received
296 leader.getFollowerToSnapshot().markSendStatus(true);
298 leader.handleMessage(senderActor, new SendHeartBeat());
300 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
301 MessageCollectorActor.getFirstMatching(followerActor,
302 InstallSnapshot.SERIALIZABLE_CLASS);
304 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
307 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
309 assertEquals(snapshotIndex, is.getLastIncludedIndex());
315 public void testSendAppendEntriesSnapshotScenario() {
316 new JavaTestKit(getSystem()) {{
318 ActorRef followerActor = getTestActor();
320 Map<String, String> peerAddresses = new HashMap<>();
321 peerAddresses.put(followerActor.path().toString(),
322 followerActor.path().toString());
324 MockRaftActorContext actorContext =
325 (MockRaftActorContext) createActorContext(getRef());
326 actorContext.setPeerAddresses(peerAddresses);
328 Map<String, String> leadersSnapshot = new HashMap<>();
329 leadersSnapshot.put("1", "A");
330 leadersSnapshot.put("2", "B");
331 leadersSnapshot.put("3", "C");
334 actorContext.getReplicatedLog().removeFrom(0);
336 final int followersLastIndex = 2;
337 final int snapshotIndex = 3;
338 final int newEntryIndex = 4;
339 final int snapshotTerm = 1;
340 final int currentTerm = 2;
342 // set the snapshot variables in replicatedlog
343 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
344 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
345 actorContext.setCommitIndex(followersLastIndex);
347 Leader leader = new Leader(actorContext);
350 ReplicatedLogImplEntry entry =
351 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
352 new MockRaftActorContext.MockPayload("D"));
354 //update follower timestamp
355 leader.markFollowerActive(followerActor.path().toString());
357 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
358 TimeUnit.MILLISECONDS);
360 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
361 RaftActorBehavior raftBehavior = leader.handleMessage(
362 senderActor, new Replicate(null, "state-id", entry));
364 assertTrue(raftBehavior instanceof Leader);
366 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
367 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
369 protected Boolean match(Object o) throws Exception {
370 if (o instanceof InitiateInstallSnapshot) {
377 boolean initiateInitiateInstallSnapshot = false;
378 for (Boolean b: matches) {
379 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
382 assertTrue(initiateInitiateInstallSnapshot);
387 public void testInitiateInstallSnapshot() throws Exception {
388 new JavaTestKit(getSystem()) {{
390 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
392 ActorRef followerActor = getTestActor();
394 Map<String, String> peerAddresses = new HashMap<>();
395 peerAddresses.put(followerActor.path().toString(),
396 followerActor.path().toString());
399 MockRaftActorContext actorContext =
400 (MockRaftActorContext) createActorContext(leaderActor);
401 actorContext.setPeerAddresses(peerAddresses);
403 Map<String, String> leadersSnapshot = new HashMap<>();
404 leadersSnapshot.put("1", "A");
405 leadersSnapshot.put("2", "B");
406 leadersSnapshot.put("3", "C");
409 actorContext.getReplicatedLog().removeFrom(0);
411 final int followersLastIndex = 2;
412 final int snapshotIndex = 3;
413 final int newEntryIndex = 4;
414 final int snapshotTerm = 1;
415 final int currentTerm = 2;
417 // set the snapshot variables in replicatedlog
418 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
419 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
420 actorContext.setLastApplied(3);
421 actorContext.setCommitIndex(followersLastIndex);
423 Leader leader = new Leader(actorContext);
424 // set the snapshot as absent and check if capture-snapshot is invoked.
425 leader.setSnapshot(Optional.<ByteString>absent());
428 ReplicatedLogImplEntry entry =
429 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
430 new MockRaftActorContext.MockPayload("D"));
432 actorContext.getReplicatedLog().append(entry);
434 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
435 RaftActorBehavior raftBehavior = leader.handleMessage(
436 leaderActor, new InitiateInstallSnapshot());
438 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
439 getFirstMatching(leaderActor, CaptureSnapshot.class);
443 assertTrue(cs.isInstallSnapshotInitiated());
444 assertEquals(3, cs.getLastAppliedIndex());
445 assertEquals(1, cs.getLastAppliedTerm());
446 assertEquals(4, cs.getLastIndex());
447 assertEquals(2, cs.getLastTerm());
449 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
450 raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
451 List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
452 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
458 public void testInstallSnapshot() {
459 new JavaTestKit(getSystem()) {{
461 ActorRef followerActor = getTestActor();
463 Map<String, String> peerAddresses = new HashMap<>();
464 peerAddresses.put(followerActor.path().toString(),
465 followerActor.path().toString());
467 MockRaftActorContext actorContext =
468 (MockRaftActorContext) createActorContext();
469 actorContext.setPeerAddresses(peerAddresses);
472 Map<String, String> leadersSnapshot = new HashMap<>();
473 leadersSnapshot.put("1", "A");
474 leadersSnapshot.put("2", "B");
475 leadersSnapshot.put("3", "C");
478 actorContext.getReplicatedLog().removeFrom(0);
480 final int followersLastIndex = 2;
481 final int snapshotIndex = 3;
482 final int newEntryIndex = 4;
483 final int snapshotTerm = 1;
484 final int currentTerm = 2;
486 // set the snapshot variables in replicatedlog
487 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
488 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
489 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
490 actorContext.setCommitIndex(followersLastIndex);
492 Leader leader = new Leader(actorContext);
495 ReplicatedLogImplEntry entry =
496 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
497 new MockRaftActorContext.MockPayload("D"));
499 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
500 new SendInstallSnapshot(toByteString(leadersSnapshot)));
502 assertTrue(raftBehavior instanceof Leader);
504 // check if installsnapshot gets called with the correct values.
506 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
507 // do not put code outside this method, will run afterwards
509 protected String match(Object in) {
510 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
511 InstallSnapshot is = (InstallSnapshot)
512 SerializationUtils.fromSerializable(in);
513 if (is.getData() == null) {
514 return "InstallSnapshot data is null";
516 if (is.getLastIncludedIndex() != snapshotIndex) {
517 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
519 if (is.getLastIncludedTerm() != snapshotTerm) {
520 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
522 if (is.getTerm() == currentTerm) {
523 return is.getTerm() + "!=" + currentTerm;
529 return "message mismatch:" + in.getClass();
532 }.get(); // this extracts the received message
534 assertEquals("match", out);
539 public void testHandleInstallSnapshotReplyLastChunk() {
540 new JavaTestKit(getSystem()) {{
542 ActorRef followerActor = getTestActor();
544 Map<String, String> peerAddresses = new HashMap<>();
545 peerAddresses.put(followerActor.path().toString(),
546 followerActor.path().toString());
548 final int followersLastIndex = 2;
549 final int snapshotIndex = 3;
550 final int newEntryIndex = 4;
551 final int snapshotTerm = 1;
552 final int currentTerm = 2;
554 MockRaftActorContext actorContext =
555 (MockRaftActorContext) createActorContext();
556 actorContext.setPeerAddresses(peerAddresses);
557 actorContext.setCommitIndex(followersLastIndex);
559 MockLeader leader = new MockLeader(actorContext);
561 Map<String, String> leadersSnapshot = new HashMap<>();
562 leadersSnapshot.put("1", "A");
563 leadersSnapshot.put("2", "B");
564 leadersSnapshot.put("3", "C");
566 // set the snapshot variables in replicatedlog
568 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
572 ByteString bs = toByteString(leadersSnapshot);
573 leader.setSnapshot(Optional.of(bs));
574 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
575 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
576 leader.getFollowerToSnapshot().getNextChunk();
577 leader.getFollowerToSnapshot().incrementChunkIndex();
581 actorContext.getReplicatedLog().removeFrom(0);
583 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
584 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
585 leader.getFollowerToSnapshot().getChunkIndex(), true));
587 assertTrue(raftBehavior instanceof Leader);
589 assertEquals(0, leader.followerSnapshotSize());
590 assertEquals(1, leader.followerLogSize());
591 assertNotNull(leader.getFollower(followerActor.path().toString()));
592 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
593 assertEquals(snapshotIndex, fli.getMatchIndex());
594 assertEquals(snapshotIndex, fli.getMatchIndex());
595 assertEquals(snapshotIndex + 1, fli.getNextIndex());
599 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
600 new JavaTestKit(getSystem()) {{
602 TestActorRef<MessageCollectorActor> followerActor =
603 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
605 Map<String, String> peerAddresses = new HashMap<>();
606 peerAddresses.put("follower-reply",
607 followerActor.path().toString());
609 final int followersLastIndex = 2;
610 final int snapshotIndex = 3;
611 final int snapshotTerm = 1;
612 final int currentTerm = 2;
614 MockRaftActorContext actorContext =
615 (MockRaftActorContext) createActorContext();
616 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
618 public int getSnapshotChunkSize() {
622 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
623 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
625 actorContext.setConfigParams(configParams);
626 actorContext.setPeerAddresses(peerAddresses);
627 actorContext.setCommitIndex(followersLastIndex);
629 MockLeader leader = new MockLeader(actorContext);
631 Map<String, String> leadersSnapshot = new HashMap<>();
632 leadersSnapshot.put("1", "A");
633 leadersSnapshot.put("2", "B");
634 leadersSnapshot.put("3", "C");
636 // set the snapshot variables in replicatedlog
637 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
638 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
639 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
641 ByteString bs = toByteString(leadersSnapshot);
642 leader.setSnapshot(Optional.of(bs));
644 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
646 List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
647 InstallSnapshotMessages.InstallSnapshot.class);
649 assertEquals(1, objectList.size());
651 Object o = objectList.get(0);
652 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
654 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
656 assertEquals(1, installSnapshot.getChunkIndex());
657 assertEquals(3, installSnapshot.getTotalChunks());
659 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
660 "follower-reply", installSnapshot.getChunkIndex(), true));
662 objectList = MessageCollectorActor.getAllMatching(followerActor,
663 InstallSnapshotMessages.InstallSnapshot.class);
665 assertEquals(2, objectList.size());
667 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
669 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
670 "follower-reply", installSnapshot.getChunkIndex(), true));
672 objectList = MessageCollectorActor.getAllMatching(followerActor,
673 InstallSnapshotMessages.InstallSnapshot.class);
675 assertEquals(3, objectList.size());
677 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
679 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
680 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
681 "follower-reply", installSnapshot.getChunkIndex(), true));
683 objectList = MessageCollectorActor.getAllMatching(followerActor,
684 InstallSnapshotMessages.InstallSnapshot.class);
686 // Count should still stay at 3
687 assertEquals(3, objectList.size());
693 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
694 new JavaTestKit(getSystem()) {{
696 TestActorRef<MessageCollectorActor> followerActor =
697 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
699 Map<String, String> peerAddresses = new HashMap<>();
700 peerAddresses.put(followerActor.path().toString(),
701 followerActor.path().toString());
703 final int followersLastIndex = 2;
704 final int snapshotIndex = 3;
705 final int snapshotTerm = 1;
706 final int currentTerm = 2;
708 MockRaftActorContext actorContext =
709 (MockRaftActorContext) createActorContext();
711 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
713 public int getSnapshotChunkSize() {
717 actorContext.setPeerAddresses(peerAddresses);
718 actorContext.setCommitIndex(followersLastIndex);
720 MockLeader leader = new MockLeader(actorContext);
722 Map<String, String> leadersSnapshot = new HashMap<>();
723 leadersSnapshot.put("1", "A");
724 leadersSnapshot.put("2", "B");
725 leadersSnapshot.put("3", "C");
727 // set the snapshot variables in replicatedlog
728 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
729 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
730 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
732 ByteString bs = toByteString(leadersSnapshot);
733 leader.setSnapshot(Optional.of(bs));
735 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
737 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
739 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
741 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
743 assertEquals(1, installSnapshot.getChunkIndex());
744 assertEquals(3, installSnapshot.getTotalChunks());
747 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
748 followerActor.path().toString(), -1, false));
750 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
751 TimeUnit.MILLISECONDS);
753 leader.handleMessage(leaderActor, new SendHeartBeat());
755 o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
757 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
759 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
761 assertEquals(1, installSnapshot.getChunkIndex());
762 assertEquals(3, installSnapshot.getTotalChunks());
764 followerActor.tell(PoisonPill.getInstance(), getRef());
769 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
770 new JavaTestKit(getSystem()) {
773 TestActorRef<MessageCollectorActor> followerActor =
774 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
776 Map<String, String> peerAddresses = new HashMap<>();
777 peerAddresses.put(followerActor.path().toString(),
778 followerActor.path().toString());
780 final int followersLastIndex = 2;
781 final int snapshotIndex = 3;
782 final int snapshotTerm = 1;
783 final int currentTerm = 2;
785 MockRaftActorContext actorContext =
786 (MockRaftActorContext) createActorContext();
788 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
790 public int getSnapshotChunkSize() {
794 actorContext.setPeerAddresses(peerAddresses);
795 actorContext.setCommitIndex(followersLastIndex);
797 MockLeader leader = new MockLeader(actorContext);
799 Map<String, String> leadersSnapshot = new HashMap<>();
800 leadersSnapshot.put("1", "A");
801 leadersSnapshot.put("2", "B");
802 leadersSnapshot.put("3", "C");
804 // set the snapshot variables in replicatedlog
805 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
806 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
807 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
809 ByteString bs = toByteString(leadersSnapshot);
810 leader.setSnapshot(Optional.of(bs));
812 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
814 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
816 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
818 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
820 assertEquals(1, installSnapshot.getChunkIndex());
821 assertEquals(3, installSnapshot.getTotalChunks());
822 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
824 int hashCode = installSnapshot.getData().hashCode();
826 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
828 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
830 leader.handleMessage(leaderActor, new SendHeartBeat());
832 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
834 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
836 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
838 assertEquals(2, installSnapshot.getChunkIndex());
839 assertEquals(3, installSnapshot.getTotalChunks());
840 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
842 followerActor.tell(PoisonPill.getInstance(), getRef());
847 public void testFollowerToSnapshotLogic() {
849 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
851 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
853 public int getSnapshotChunkSize() {
858 MockLeader leader = new MockLeader(actorContext);
860 Map<String, String> leadersSnapshot = new HashMap<>();
861 leadersSnapshot.put("1", "A");
862 leadersSnapshot.put("2", "B");
863 leadersSnapshot.put("3", "C");
865 ByteString bs = toByteString(leadersSnapshot);
866 byte[] barray = bs.toByteArray();
868 leader.createFollowerToSnapshot("followerId", bs);
869 assertEquals(bs.size(), barray.length);
872 for (int i=0; i < barray.length; i = i + 50) {
876 if (i + 50 > barray.length) {
880 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
881 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
882 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
884 leader.getFollowerToSnapshot().markSendStatus(true);
885 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
886 leader.getFollowerToSnapshot().incrementChunkIndex();
890 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
894 @Override protected RaftActorBehavior createBehavior(
895 RaftActorContext actorContext) {
896 return new Leader(actorContext);
899 @Override protected RaftActorContext createActorContext() {
900 return createActorContext(leaderActor);
904 protected RaftActorContext createActorContext(ActorRef actorRef) {
905 return new MockRaftActorContext("test", getSystem(), actorRef);
908 private ByteString toByteString(Map<String, String> state) {
909 ByteArrayOutputStream b = null;
910 ObjectOutputStream o = null;
913 b = new ByteArrayOutputStream();
914 o = new ObjectOutputStream(b);
915 o.writeObject(state);
916 byte[] snapshotBytes = b.toByteArray();
917 return ByteString.copyFrom(snapshotBytes);
927 } catch (IOException e) {
928 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
933 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
934 private static AbstractRaftActorBehavior behavior;
936 public ForwardMessageToBehaviorActor(){
940 @Override public void onReceive(Object message) throws Exception {
941 super.onReceive(message);
942 behavior.handleMessage(sender(), message);
945 public static void setBehavior(AbstractRaftActorBehavior behavior){
946 ForwardMessageToBehaviorActor.behavior = behavior;
951 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
952 new JavaTestKit(getSystem()) {{
954 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
956 MockRaftActorContext leaderActorContext =
957 new MockRaftActorContext("leader", getSystem(), leaderActor);
959 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
961 MockRaftActorContext followerActorContext =
962 new MockRaftActorContext("follower", getSystem(), followerActor);
964 Follower follower = new Follower(followerActorContext);
966 ForwardMessageToBehaviorActor.setBehavior(follower);
968 Map<String, String> peerAddresses = new HashMap<>();
969 peerAddresses.put(followerActor.path().toString(),
970 followerActor.path().toString());
972 leaderActorContext.setPeerAddresses(peerAddresses);
974 leaderActorContext.getReplicatedLog().removeFrom(0);
977 leaderActorContext.setReplicatedLog(
978 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
980 leaderActorContext.setCommitIndex(1);
982 followerActorContext.getReplicatedLog().removeFrom(0);
984 // follower too has the exact same log entries and has the same commit index
985 followerActorContext.setReplicatedLog(
986 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
988 followerActorContext.setCommitIndex(1);
990 Leader leader = new Leader(leaderActorContext);
991 leader.markFollowerActive(followerActor.path().toString());
993 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
994 TimeUnit.MILLISECONDS);
996 leader.handleMessage(leaderActor, new SendHeartBeat());
998 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
999 .getFirstMatching(followerActor, AppendEntries.class);
1001 assertNotNull(appendEntries);
1003 assertEquals(1, appendEntries.getLeaderCommit());
1004 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1005 assertEquals(0, appendEntries.getPrevLogIndex());
1007 AppendEntriesReply appendEntriesReply =
1008 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1009 leaderActor, AppendEntriesReply.class);
1011 assertNotNull(appendEntriesReply);
1013 // follower returns its next index
1014 assertEquals(2, appendEntriesReply.getLogLastIndex());
1015 assertEquals(1, appendEntriesReply.getLogLastTerm());
1022 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1023 new JavaTestKit(getSystem()) {{
1025 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1027 MockRaftActorContext leaderActorContext =
1028 new MockRaftActorContext("leader", getSystem(), leaderActor);
1030 ActorRef followerActor = getSystem().actorOf(
1031 Props.create(ForwardMessageToBehaviorActor.class));
1033 MockRaftActorContext followerActorContext =
1034 new MockRaftActorContext("follower", getSystem(), followerActor);
1036 Follower follower = new Follower(followerActorContext);
1038 ForwardMessageToBehaviorActor.setBehavior(follower);
1040 Map<String, String> peerAddresses = new HashMap<>();
1041 peerAddresses.put(followerActor.path().toString(),
1042 followerActor.path().toString());
1044 leaderActorContext.setPeerAddresses(peerAddresses);
1046 leaderActorContext.getReplicatedLog().removeFrom(0);
1048 leaderActorContext.setReplicatedLog(
1049 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1051 leaderActorContext.setCommitIndex(1);
1053 followerActorContext.getReplicatedLog().removeFrom(0);
1055 followerActorContext.setReplicatedLog(
1056 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1058 // follower has the same log entries but its commit index > leaders commit index
1059 followerActorContext.setCommitIndex(2);
1061 Leader leader = new Leader(leaderActorContext);
1062 leader.markFollowerActive(followerActor.path().toString());
1064 Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
1066 leader.handleMessage(leaderActor, new SendHeartBeat());
1068 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
1069 .getFirstMatching(followerActor, AppendEntries.class);
1071 assertNotNull(appendEntries);
1073 assertEquals(1, appendEntries.getLeaderCommit());
1074 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1075 assertEquals(0, appendEntries.getPrevLogIndex());
1077 AppendEntriesReply appendEntriesReply =
1078 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1079 leaderActor, AppendEntriesReply.class);
1081 assertNotNull(appendEntriesReply);
1083 assertEquals(2, appendEntriesReply.getLogLastIndex());
1084 assertEquals(1, appendEntriesReply.getLogLastTerm());
1090 public void testHandleAppendEntriesReplyFailure(){
1091 new JavaTestKit(getSystem()) {
1094 ActorRef leaderActor =
1095 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1097 ActorRef followerActor =
1098 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1101 MockRaftActorContext leaderActorContext =
1102 new MockRaftActorContext("leader", getSystem(), leaderActor);
1104 Map<String, String> peerAddresses = new HashMap<>();
1105 peerAddresses.put("follower-1",
1106 followerActor.path().toString());
1108 leaderActorContext.setPeerAddresses(peerAddresses);
1110 Leader leader = new Leader(leaderActorContext);
1112 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1114 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1116 assertEquals(RaftState.Leader, raftActorBehavior.state());
1122 public void testHandleAppendEntriesReplySuccess() throws Exception {
1123 new JavaTestKit(getSystem()) {
1126 ActorRef leaderActor =
1127 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1129 ActorRef followerActor =
1130 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1133 MockRaftActorContext leaderActorContext =
1134 new MockRaftActorContext("leader", getSystem(), leaderActor);
1136 leaderActorContext.setReplicatedLog(
1137 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1139 Map<String, String> peerAddresses = new HashMap<>();
1140 peerAddresses.put("follower-1",
1141 followerActor.path().toString());
1143 leaderActorContext.setPeerAddresses(peerAddresses);
1144 leaderActorContext.setCommitIndex(1);
1145 leaderActorContext.setLastApplied(1);
1146 leaderActorContext.getTermInformation().update(1, "leader");
1148 Leader leader = new Leader(leaderActorContext);
1150 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1152 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1154 assertEquals(RaftState.Leader, raftActorBehavior.state());
1156 assertEquals(2, leaderActorContext.getCommitIndex());
1158 ApplyLogEntries applyLogEntries =
1159 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1160 ApplyLogEntries.class);
1162 assertNotNull(applyLogEntries);
1164 assertEquals(2, leaderActorContext.getLastApplied());
1166 assertEquals(2, applyLogEntries.getToIndex());
1168 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1171 assertEquals(1,applyStateList.size());
1173 ApplyState applyState = (ApplyState) applyStateList.get(0);
1175 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1181 public void testHandleAppendEntriesReplyUnknownFollower(){
1182 new JavaTestKit(getSystem()) {
1185 ActorRef leaderActor =
1186 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1188 MockRaftActorContext leaderActorContext =
1189 new MockRaftActorContext("leader", getSystem(), leaderActor);
1191 Leader leader = new Leader(leaderActorContext);
1193 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1195 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1197 assertEquals(RaftState.Leader, raftActorBehavior.state());
1203 public void testHandleRequestVoteReply(){
1204 new JavaTestKit(getSystem()) {
1207 ActorRef leaderActor =
1208 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1210 MockRaftActorContext leaderActorContext =
1211 new MockRaftActorContext("leader", getSystem(), leaderActor);
1213 Leader leader = new Leader(leaderActorContext);
1215 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1217 assertEquals(RaftState.Leader, raftActorBehavior.state());
1219 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1221 assertEquals(RaftState.Leader, raftActorBehavior.state());
1226 public void testIsolatedLeaderCheckNoFollowers() {
1227 new JavaTestKit(getSystem()) {{
1228 ActorRef leaderActor = getTestActor();
1230 MockRaftActorContext leaderActorContext =
1231 new MockRaftActorContext("leader", getSystem(), leaderActor);
1233 Map<String, String> peerAddresses = new HashMap<>();
1234 leaderActorContext.setPeerAddresses(peerAddresses);
1236 Leader leader = new Leader(leaderActorContext);
1237 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1238 Assert.assertTrue(behavior instanceof Leader);
1243 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1244 new JavaTestKit(getSystem()) {{
1246 ActorRef followerActor1 = getTestActor();
1247 ActorRef followerActor2 = getTestActor();
1249 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1251 Map<String, String> peerAddresses = new HashMap<>();
1252 peerAddresses.put("follower-1", followerActor1.path().toString());
1253 peerAddresses.put("follower-2", followerActor2.path().toString());
1255 leaderActorContext.setPeerAddresses(peerAddresses);
1257 Leader leader = new Leader(leaderActorContext);
1258 leader.stopIsolatedLeaderCheckSchedule();
1260 leader.markFollowerActive("follower-1");
1261 leader.markFollowerActive("follower-2");
1262 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1263 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1264 behavior instanceof Leader);
1266 // kill 1 follower and verify if that got killed
1267 final JavaTestKit probe = new JavaTestKit(getSystem());
1268 probe.watch(followerActor1);
1269 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1270 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1271 assertEquals(termMsg1.getActor(), followerActor1);
1273 leader.markFollowerInActive("follower-1");
1274 leader.markFollowerActive("follower-2");
1275 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1276 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1277 behavior instanceof Leader);
1279 // kill 2nd follower and leader should change to Isolated leader
1280 followerActor2.tell(PoisonPill.getInstance(), null);
1281 probe.watch(followerActor2);
1282 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1283 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1284 assertEquals(termMsg2.getActor(), followerActor2);
1286 leader.markFollowerInActive("follower-2");
1287 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1288 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1289 behavior instanceof IsolatedLeader);
1296 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1297 new JavaTestKit(getSystem()) {{
1299 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1301 MockRaftActorContext leaderActorContext =
1302 new MockRaftActorContext("leader", getSystem(), leaderActor);
1304 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1305 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1306 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1308 leaderActorContext.setConfigParams(configParams);
1310 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1312 MockRaftActorContext followerActorContext =
1313 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1315 followerActorContext.setConfigParams(configParams);
1317 Follower follower = new Follower(followerActorContext);
1319 ForwardMessageToBehaviorActor.setBehavior(follower);
1321 Map<String, String> peerAddresses = new HashMap<>();
1322 peerAddresses.put("follower-reply",
1323 followerActor.path().toString());
1325 leaderActorContext.setPeerAddresses(peerAddresses);
1327 leaderActorContext.getReplicatedLog().removeFrom(0);
1330 leaderActorContext.setReplicatedLog(
1331 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1333 leaderActorContext.setCommitIndex(1);
1335 Leader leader = new Leader(leaderActorContext);
1336 leader.markFollowerActive("follower-reply");
1338 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1339 TimeUnit.MILLISECONDS);
1341 leader.handleMessage(leaderActor, new SendHeartBeat());
1343 AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1344 .getFirstMatching(followerActor, AppendEntries.class);
1346 assertNotNull(appendEntries);
1348 assertEquals(1, appendEntries.getLeaderCommit());
1349 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1350 assertEquals(0, appendEntries.getPrevLogIndex());
1352 AppendEntriesReply appendEntriesReply =
1353 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1355 assertNotNull(appendEntriesReply);
1357 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1359 List<Object> entries = ForwardMessageToBehaviorActor
1360 .getAllMatching(followerActor, AppendEntries.class);
1362 assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1364 AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1366 assertEquals(1, appendEntriesSecond.getLeaderCommit());
1367 assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1368 assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1373 class MockLeader extends Leader {
1375 FollowerToSnapshot fts;
1377 public MockLeader(RaftActorContext context){
1381 public FollowerToSnapshot getFollowerToSnapshot() {
1385 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1386 fts = new FollowerToSnapshot(bs);
1387 setFollowerSnapshot(followerId, fts);
1391 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1393 private final long electionTimeOutIntervalMillis;
1394 private final int snapshotChunkSize;
1396 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1398 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1399 this.snapshotChunkSize = snapshotChunkSize;
1403 public FiniteDuration getElectionTimeOutInterval() {
1404 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1408 public int getSnapshotChunkSize() {
1409 return snapshotChunkSize;