1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51 private final ActorRef leaderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
53 private final ActorRef senderActor =
54 getSystem().actorOf(Props.create(DoNothingActor.class));
57 public void testHandleMessageForUnknownMessage() throws Exception {
58 new JavaTestKit(getSystem()) {{
60 new Leader(createActorContext());
62 // handle message should return the Leader state when it receives an
64 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65 Assert.assertTrue(behavior instanceof Leader);
70 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71 new JavaTestKit(getSystem()) {{
73 new Within(duration("1 seconds")) {
75 protected void run() {
77 ActorRef followerActor = getTestActor();
79 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81 Map<String, String> peerAddresses = new HashMap<>();
83 peerAddresses.put(followerActor.path().toString(),
84 followerActor.path().toString());
86 actorContext.setPeerAddresses(peerAddresses);
88 Leader leader = new Leader(actorContext);
89 leader.markFollowerActive(followerActor.path().toString());
90 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
91 TimeUnit.MILLISECONDS);
92 leader.handleMessage(senderActor, new SendHeartBeat());
95 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
96 // do not put code outside this method, will run afterwards
98 protected String match(Object in) {
99 Object msg = fromSerializableMessage(in);
100 if (msg instanceof AppendEntries) {
101 if (((AppendEntries)msg).getTerm() == 0) {
109 }.get(); // this extracts the received message
111 assertEquals("match", out);
119 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
120 new JavaTestKit(getSystem()) {{
122 new Within(duration("1 seconds")) {
124 protected void run() {
126 ActorRef followerActor = getTestActor();
128 MockRaftActorContext actorContext =
129 (MockRaftActorContext) createActorContext();
131 Map<String, String> peerAddresses = new HashMap<>();
133 peerAddresses.put(followerActor.path().toString(),
134 followerActor.path().toString());
136 actorContext.setPeerAddresses(peerAddresses);
138 Leader leader = new Leader(actorContext);
139 leader.markFollowerActive(followerActor.path().toString());
140 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
141 TimeUnit.MILLISECONDS);
142 RaftActorBehavior raftBehavior = leader
143 .handleMessage(senderActor, new Replicate(null, null,
144 new MockRaftActorContext.MockReplicatedLogEntry(1,
146 new MockRaftActorContext.MockPayload("foo"))
149 // State should not change
150 assertTrue(raftBehavior instanceof Leader);
153 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
154 // do not put code outside this method, will run afterwards
156 protected String match(Object in) {
157 Object msg = fromSerializableMessage(in);
158 if (msg instanceof AppendEntries) {
159 if (((AppendEntries)msg).getTerm() == 0) {
167 }.get(); // this extracts the received message
169 assertEquals("match", out);
176 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
177 new JavaTestKit(getSystem()) {{
179 new Within(duration("1 seconds")) {
181 protected void run() {
183 ActorRef raftActor = getTestActor();
185 MockRaftActorContext actorContext =
186 new MockRaftActorContext("test", getSystem(), raftActor);
188 actorContext.getReplicatedLog().removeFrom(0);
190 actorContext.setReplicatedLog(
191 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
194 Leader leader = new Leader(actorContext);
195 RaftActorBehavior raftBehavior = leader
196 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
198 // State should not change
199 assertTrue(raftBehavior instanceof Leader);
201 assertEquals(1, actorContext.getCommitIndex());
204 new ExpectMsg<String>(duration("1 seconds"),
206 // do not put code outside this method, will run afterwards
208 protected String match(Object in) {
209 if (in instanceof ApplyState) {
210 if (((ApplyState) in).getIdentifier().equals("state-id")) {
218 }.get(); // this extracts the received message
220 assertEquals("match", out);
228 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
229 new JavaTestKit(getSystem()) {{
230 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
232 Map<String, String> peerAddresses = new HashMap<>();
233 peerAddresses.put(followerActor.path().toString(),
234 followerActor.path().toString());
236 MockRaftActorContext actorContext =
237 (MockRaftActorContext) createActorContext(leaderActor);
238 actorContext.setPeerAddresses(peerAddresses);
240 Map<String, String> leadersSnapshot = new HashMap<>();
241 leadersSnapshot.put("1", "A");
242 leadersSnapshot.put("2", "B");
243 leadersSnapshot.put("3", "C");
246 actorContext.getReplicatedLog().removeFrom(0);
248 final int followersLastIndex = 2;
249 final int snapshotIndex = 3;
250 final int newEntryIndex = 4;
251 final int snapshotTerm = 1;
252 final int currentTerm = 2;
254 // set the snapshot variables in replicatedlog
255 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
256 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
257 actorContext.setCommitIndex(followersLastIndex);
258 //set follower timeout to 2 mins, helps during debugging
259 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
261 MockLeader leader = new MockLeader(actorContext);
264 ReplicatedLogImplEntry entry =
265 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
266 new MockRaftActorContext.MockPayload("D"));
268 //update follower timestamp
269 leader.markFollowerActive(followerActor.path().toString());
271 ByteString bs = toByteString(leadersSnapshot);
272 leader.setSnapshot(Optional.of(bs));
273 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
275 //send first chunk and no InstallSnapshotReply received yet
276 leader.getFollowerToSnapshot().getNextChunk();
277 leader.getFollowerToSnapshot().incrementChunkIndex();
279 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
280 TimeUnit.MILLISECONDS);
282 leader.handleMessage(leaderActor, new SendHeartBeat());
284 AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
285 followerActor, AppendEntries.class);
287 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
288 "received", aeproto);
290 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
292 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
294 //InstallSnapshotReply received
295 leader.getFollowerToSnapshot().markSendStatus(true);
297 leader.handleMessage(senderActor, new SendHeartBeat());
299 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
300 MessageCollectorActor.getFirstMatching(followerActor,
301 InstallSnapshot.SERIALIZABLE_CLASS);
303 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
306 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
308 assertEquals(snapshotIndex, is.getLastIncludedIndex());
314 public void testSendAppendEntriesSnapshotScenario() {
315 new JavaTestKit(getSystem()) {{
317 ActorRef followerActor = getTestActor();
319 Map<String, String> peerAddresses = new HashMap<>();
320 peerAddresses.put(followerActor.path().toString(),
321 followerActor.path().toString());
323 MockRaftActorContext actorContext =
324 (MockRaftActorContext) createActorContext(getRef());
325 actorContext.setPeerAddresses(peerAddresses);
327 Map<String, String> leadersSnapshot = new HashMap<>();
328 leadersSnapshot.put("1", "A");
329 leadersSnapshot.put("2", "B");
330 leadersSnapshot.put("3", "C");
333 actorContext.getReplicatedLog().removeFrom(0);
335 final int followersLastIndex = 2;
336 final int snapshotIndex = 3;
337 final int newEntryIndex = 4;
338 final int snapshotTerm = 1;
339 final int currentTerm = 2;
341 // set the snapshot variables in replicatedlog
342 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
343 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
344 actorContext.setCommitIndex(followersLastIndex);
346 Leader leader = new Leader(actorContext);
349 ReplicatedLogImplEntry entry =
350 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
351 new MockRaftActorContext.MockPayload("D"));
353 //update follower timestamp
354 leader.markFollowerActive(followerActor.path().toString());
356 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
357 TimeUnit.MILLISECONDS);
359 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
360 RaftActorBehavior raftBehavior = leader.handleMessage(
361 senderActor, new Replicate(null, "state-id", entry));
363 assertTrue(raftBehavior instanceof Leader);
365 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
366 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
368 protected Boolean match(Object o) throws Exception {
369 if (o instanceof InitiateInstallSnapshot) {
376 boolean initiateInitiateInstallSnapshot = false;
377 for (Boolean b: matches) {
378 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
381 assertTrue(initiateInitiateInstallSnapshot);
386 public void testInitiateInstallSnapshot() throws Exception {
387 new JavaTestKit(getSystem()) {{
389 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
391 ActorRef followerActor = getTestActor();
393 Map<String, String> peerAddresses = new HashMap<>();
394 peerAddresses.put(followerActor.path().toString(),
395 followerActor.path().toString());
398 MockRaftActorContext actorContext =
399 (MockRaftActorContext) createActorContext(leaderActor);
400 actorContext.setPeerAddresses(peerAddresses);
402 Map<String, String> leadersSnapshot = new HashMap<>();
403 leadersSnapshot.put("1", "A");
404 leadersSnapshot.put("2", "B");
405 leadersSnapshot.put("3", "C");
408 actorContext.getReplicatedLog().removeFrom(0);
410 final int followersLastIndex = 2;
411 final int snapshotIndex = 3;
412 final int newEntryIndex = 4;
413 final int snapshotTerm = 1;
414 final int currentTerm = 2;
416 // set the snapshot variables in replicatedlog
417 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
418 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
419 actorContext.setLastApplied(3);
420 actorContext.setCommitIndex(followersLastIndex);
422 Leader leader = new Leader(actorContext);
423 // set the snapshot as absent and check if capture-snapshot is invoked.
424 leader.setSnapshot(Optional.<ByteString>absent());
427 ReplicatedLogImplEntry entry =
428 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
429 new MockRaftActorContext.MockPayload("D"));
431 actorContext.getReplicatedLog().append(entry);
433 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
434 RaftActorBehavior raftBehavior = leader.handleMessage(
435 leaderActor, new InitiateInstallSnapshot());
437 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
438 getFirstMatching(leaderActor, CaptureSnapshot.class);
442 assertTrue(cs.isInstallSnapshotInitiated());
443 assertEquals(3, cs.getLastAppliedIndex());
444 assertEquals(1, cs.getLastAppliedTerm());
445 assertEquals(4, cs.getLastIndex());
446 assertEquals(2, cs.getLastTerm());
451 public void testInstallSnapshot() {
452 new JavaTestKit(getSystem()) {{
454 ActorRef followerActor = getTestActor();
456 Map<String, String> peerAddresses = new HashMap<>();
457 peerAddresses.put(followerActor.path().toString(),
458 followerActor.path().toString());
460 MockRaftActorContext actorContext =
461 (MockRaftActorContext) createActorContext();
462 actorContext.setPeerAddresses(peerAddresses);
465 Map<String, String> leadersSnapshot = new HashMap<>();
466 leadersSnapshot.put("1", "A");
467 leadersSnapshot.put("2", "B");
468 leadersSnapshot.put("3", "C");
471 actorContext.getReplicatedLog().removeFrom(0);
473 final int followersLastIndex = 2;
474 final int snapshotIndex = 3;
475 final int newEntryIndex = 4;
476 final int snapshotTerm = 1;
477 final int currentTerm = 2;
479 // set the snapshot variables in replicatedlog
480 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483 actorContext.setCommitIndex(followersLastIndex);
485 Leader leader = new Leader(actorContext);
488 ReplicatedLogImplEntry entry =
489 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
490 new MockRaftActorContext.MockPayload("D"));
492 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
493 new SendInstallSnapshot(toByteString(leadersSnapshot)));
495 assertTrue(raftBehavior instanceof Leader);
497 // check if installsnapshot gets called with the correct values.
499 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
500 // do not put code outside this method, will run afterwards
502 protected String match(Object in) {
503 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
504 InstallSnapshot is = (InstallSnapshot)
505 SerializationUtils.fromSerializable(in);
506 if (is.getData() == null) {
507 return "InstallSnapshot data is null";
509 if (is.getLastIncludedIndex() != snapshotIndex) {
510 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
512 if (is.getLastIncludedTerm() != snapshotTerm) {
513 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
515 if (is.getTerm() == currentTerm) {
516 return is.getTerm() + "!=" + currentTerm;
522 return "message mismatch:" + in.getClass();
525 }.get(); // this extracts the received message
527 assertEquals("match", out);
532 public void testHandleInstallSnapshotReplyLastChunk() {
533 new JavaTestKit(getSystem()) {{
535 ActorRef followerActor = getTestActor();
537 Map<String, String> peerAddresses = new HashMap<>();
538 peerAddresses.put(followerActor.path().toString(),
539 followerActor.path().toString());
541 final int followersLastIndex = 2;
542 final int snapshotIndex = 3;
543 final int newEntryIndex = 4;
544 final int snapshotTerm = 1;
545 final int currentTerm = 2;
547 MockRaftActorContext actorContext =
548 (MockRaftActorContext) createActorContext();
549 actorContext.setPeerAddresses(peerAddresses);
550 actorContext.setCommitIndex(followersLastIndex);
552 MockLeader leader = new MockLeader(actorContext);
554 Map<String, String> leadersSnapshot = new HashMap<>();
555 leadersSnapshot.put("1", "A");
556 leadersSnapshot.put("2", "B");
557 leadersSnapshot.put("3", "C");
559 // set the snapshot variables in replicatedlog
561 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
565 ByteString bs = toByteString(leadersSnapshot);
566 leader.setSnapshot(Optional.of(bs));
567 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
568 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
569 leader.getFollowerToSnapshot().getNextChunk();
570 leader.getFollowerToSnapshot().incrementChunkIndex();
574 actorContext.getReplicatedLog().removeFrom(0);
576 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
577 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
578 leader.getFollowerToSnapshot().getChunkIndex(), true));
580 assertTrue(raftBehavior instanceof Leader);
582 assertEquals(0, leader.followerSnapshotSize());
583 assertEquals(1, leader.followerLogSize());
584 assertNotNull(leader.getFollower(followerActor.path().toString()));
585 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
586 assertEquals(snapshotIndex, fli.getMatchIndex());
587 assertEquals(snapshotIndex, fli.getMatchIndex());
588 assertEquals(snapshotIndex + 1, fli.getNextIndex());
592 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
593 new JavaTestKit(getSystem()) {{
595 TestActorRef<MessageCollectorActor> followerActor =
596 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
598 Map<String, String> peerAddresses = new HashMap<>();
599 peerAddresses.put("follower-reply",
600 followerActor.path().toString());
602 final int followersLastIndex = 2;
603 final int snapshotIndex = 3;
604 final int snapshotTerm = 1;
605 final int currentTerm = 2;
607 MockRaftActorContext actorContext =
608 (MockRaftActorContext) createActorContext();
609 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
611 public int getSnapshotChunkSize() {
615 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
616 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
618 actorContext.setConfigParams(configParams);
619 actorContext.setPeerAddresses(peerAddresses);
620 actorContext.setCommitIndex(followersLastIndex);
622 MockLeader leader = new MockLeader(actorContext);
624 Map<String, String> leadersSnapshot = new HashMap<>();
625 leadersSnapshot.put("1", "A");
626 leadersSnapshot.put("2", "B");
627 leadersSnapshot.put("3", "C");
629 // set the snapshot variables in replicatedlog
630 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
631 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
632 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
634 ByteString bs = toByteString(leadersSnapshot);
635 leader.setSnapshot(Optional.of(bs));
637 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
639 List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
640 InstallSnapshotMessages.InstallSnapshot.class);
642 assertEquals(1, objectList.size());
644 Object o = objectList.get(0);
645 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
647 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
649 assertEquals(1, installSnapshot.getChunkIndex());
650 assertEquals(3, installSnapshot.getTotalChunks());
652 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
653 "follower-reply", installSnapshot.getChunkIndex(), true));
655 objectList = MessageCollectorActor.getAllMatching(followerActor,
656 InstallSnapshotMessages.InstallSnapshot.class);
658 assertEquals(2, objectList.size());
660 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
662 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
663 "follower-reply", installSnapshot.getChunkIndex(), true));
665 objectList = MessageCollectorActor.getAllMatching(followerActor,
666 InstallSnapshotMessages.InstallSnapshot.class);
668 assertEquals(3, objectList.size());
670 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
672 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
673 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
674 "follower-reply", installSnapshot.getChunkIndex(), true));
676 objectList = MessageCollectorActor.getAllMatching(followerActor,
677 InstallSnapshotMessages.InstallSnapshot.class);
679 // Count should still stay at 3
680 assertEquals(3, objectList.size());
686 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
687 new JavaTestKit(getSystem()) {{
689 TestActorRef<MessageCollectorActor> followerActor =
690 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
692 Map<String, String> peerAddresses = new HashMap<>();
693 peerAddresses.put(followerActor.path().toString(),
694 followerActor.path().toString());
696 final int followersLastIndex = 2;
697 final int snapshotIndex = 3;
698 final int snapshotTerm = 1;
699 final int currentTerm = 2;
701 MockRaftActorContext actorContext =
702 (MockRaftActorContext) createActorContext();
704 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
706 public int getSnapshotChunkSize() {
710 actorContext.setPeerAddresses(peerAddresses);
711 actorContext.setCommitIndex(followersLastIndex);
713 MockLeader leader = new MockLeader(actorContext);
715 Map<String, String> leadersSnapshot = new HashMap<>();
716 leadersSnapshot.put("1", "A");
717 leadersSnapshot.put("2", "B");
718 leadersSnapshot.put("3", "C");
720 // set the snapshot variables in replicatedlog
721 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
722 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
723 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
725 ByteString bs = toByteString(leadersSnapshot);
726 leader.setSnapshot(Optional.of(bs));
728 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
730 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
732 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
734 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
736 assertEquals(1, installSnapshot.getChunkIndex());
737 assertEquals(3, installSnapshot.getTotalChunks());
740 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
741 followerActor.path().toString(), -1, false));
743 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
744 TimeUnit.MILLISECONDS);
746 leader.handleMessage(leaderActor, new SendHeartBeat());
748 o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
750 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
752 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
754 assertEquals(1, installSnapshot.getChunkIndex());
755 assertEquals(3, installSnapshot.getTotalChunks());
757 followerActor.tell(PoisonPill.getInstance(), getRef());
762 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
763 new JavaTestKit(getSystem()) {
766 TestActorRef<MessageCollectorActor> followerActor =
767 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
769 Map<String, String> peerAddresses = new HashMap<>();
770 peerAddresses.put(followerActor.path().toString(),
771 followerActor.path().toString());
773 final int followersLastIndex = 2;
774 final int snapshotIndex = 3;
775 final int snapshotTerm = 1;
776 final int currentTerm = 2;
778 MockRaftActorContext actorContext =
779 (MockRaftActorContext) createActorContext();
781 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
783 public int getSnapshotChunkSize() {
787 actorContext.setPeerAddresses(peerAddresses);
788 actorContext.setCommitIndex(followersLastIndex);
790 MockLeader leader = new MockLeader(actorContext);
792 Map<String, String> leadersSnapshot = new HashMap<>();
793 leadersSnapshot.put("1", "A");
794 leadersSnapshot.put("2", "B");
795 leadersSnapshot.put("3", "C");
797 // set the snapshot variables in replicatedlog
798 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
799 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
800 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802 ByteString bs = toByteString(leadersSnapshot);
803 leader.setSnapshot(Optional.of(bs));
805 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
807 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
809 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
811 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
813 assertEquals(1, installSnapshot.getChunkIndex());
814 assertEquals(3, installSnapshot.getTotalChunks());
815 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
817 int hashCode = installSnapshot.getData().hashCode();
819 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
821 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
823 leader.handleMessage(leaderActor, new SendHeartBeat());
825 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
827 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
829 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
831 assertEquals(2, installSnapshot.getChunkIndex());
832 assertEquals(3, installSnapshot.getTotalChunks());
833 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
835 followerActor.tell(PoisonPill.getInstance(), getRef());
840 public void testFollowerToSnapshotLogic() {
842 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
844 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
846 public int getSnapshotChunkSize() {
851 MockLeader leader = new MockLeader(actorContext);
853 Map<String, String> leadersSnapshot = new HashMap<>();
854 leadersSnapshot.put("1", "A");
855 leadersSnapshot.put("2", "B");
856 leadersSnapshot.put("3", "C");
858 ByteString bs = toByteString(leadersSnapshot);
859 byte[] barray = bs.toByteArray();
861 leader.createFollowerToSnapshot("followerId", bs);
862 assertEquals(bs.size(), barray.length);
865 for (int i=0; i < barray.length; i = i + 50) {
869 if (i + 50 > barray.length) {
873 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
874 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
875 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
877 leader.getFollowerToSnapshot().markSendStatus(true);
878 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
879 leader.getFollowerToSnapshot().incrementChunkIndex();
883 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
887 @Override protected RaftActorBehavior createBehavior(
888 RaftActorContext actorContext) {
889 return new Leader(actorContext);
892 @Override protected RaftActorContext createActorContext() {
893 return createActorContext(leaderActor);
897 protected RaftActorContext createActorContext(ActorRef actorRef) {
898 return new MockRaftActorContext("test", getSystem(), actorRef);
901 private ByteString toByteString(Map<String, String> state) {
902 ByteArrayOutputStream b = null;
903 ObjectOutputStream o = null;
906 b = new ByteArrayOutputStream();
907 o = new ObjectOutputStream(b);
908 o.writeObject(state);
909 byte[] snapshotBytes = b.toByteArray();
910 return ByteString.copyFrom(snapshotBytes);
920 } catch (IOException e) {
921 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
926 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
927 private static AbstractRaftActorBehavior behavior;
929 public ForwardMessageToBehaviorActor(){
933 @Override public void onReceive(Object message) throws Exception {
934 super.onReceive(message);
935 behavior.handleMessage(sender(), message);
938 public static void setBehavior(AbstractRaftActorBehavior behavior){
939 ForwardMessageToBehaviorActor.behavior = behavior;
944 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
945 new JavaTestKit(getSystem()) {{
947 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
949 MockRaftActorContext leaderActorContext =
950 new MockRaftActorContext("leader", getSystem(), leaderActor);
952 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
954 MockRaftActorContext followerActorContext =
955 new MockRaftActorContext("follower", getSystem(), followerActor);
957 Follower follower = new Follower(followerActorContext);
959 ForwardMessageToBehaviorActor.setBehavior(follower);
961 Map<String, String> peerAddresses = new HashMap<>();
962 peerAddresses.put(followerActor.path().toString(),
963 followerActor.path().toString());
965 leaderActorContext.setPeerAddresses(peerAddresses);
967 leaderActorContext.getReplicatedLog().removeFrom(0);
970 leaderActorContext.setReplicatedLog(
971 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
973 leaderActorContext.setCommitIndex(1);
975 followerActorContext.getReplicatedLog().removeFrom(0);
977 // follower too has the exact same log entries and has the same commit index
978 followerActorContext.setReplicatedLog(
979 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
981 followerActorContext.setCommitIndex(1);
983 Leader leader = new Leader(leaderActorContext);
984 leader.markFollowerActive(followerActor.path().toString());
986 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
987 TimeUnit.MILLISECONDS);
989 leader.handleMessage(leaderActor, new SendHeartBeat());
991 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
992 .getFirstMatching(followerActor, AppendEntries.class);
994 assertNotNull(appendEntries);
996 assertEquals(1, appendEntries.getLeaderCommit());
997 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
998 assertEquals(0, appendEntries.getPrevLogIndex());
1000 AppendEntriesReply appendEntriesReply =
1001 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1002 leaderActor, AppendEntriesReply.class);
1004 assertNotNull(appendEntriesReply);
1006 // follower returns its next index
1007 assertEquals(2, appendEntriesReply.getLogLastIndex());
1008 assertEquals(1, appendEntriesReply.getLogLastTerm());
1015 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1016 new JavaTestKit(getSystem()) {{
1018 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1020 MockRaftActorContext leaderActorContext =
1021 new MockRaftActorContext("leader", getSystem(), leaderActor);
1023 ActorRef followerActor = getSystem().actorOf(
1024 Props.create(ForwardMessageToBehaviorActor.class));
1026 MockRaftActorContext followerActorContext =
1027 new MockRaftActorContext("follower", getSystem(), followerActor);
1029 Follower follower = new Follower(followerActorContext);
1031 ForwardMessageToBehaviorActor.setBehavior(follower);
1033 Map<String, String> peerAddresses = new HashMap<>();
1034 peerAddresses.put(followerActor.path().toString(),
1035 followerActor.path().toString());
1037 leaderActorContext.setPeerAddresses(peerAddresses);
1039 leaderActorContext.getReplicatedLog().removeFrom(0);
1041 leaderActorContext.setReplicatedLog(
1042 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1044 leaderActorContext.setCommitIndex(1);
1046 followerActorContext.getReplicatedLog().removeFrom(0);
1048 followerActorContext.setReplicatedLog(
1049 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1051 // follower has the same log entries but its commit index > leaders commit index
1052 followerActorContext.setCommitIndex(2);
1054 Leader leader = new Leader(leaderActorContext);
1055 leader.markFollowerActive(followerActor.path().toString());
1057 Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
1059 leader.handleMessage(leaderActor, new SendHeartBeat());
1061 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
1062 .getFirstMatching(followerActor, AppendEntries.class);
1064 assertNotNull(appendEntries);
1066 assertEquals(1, appendEntries.getLeaderCommit());
1067 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1068 assertEquals(0, appendEntries.getPrevLogIndex());
1070 AppendEntriesReply appendEntriesReply =
1071 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
1072 leaderActor, AppendEntriesReply.class);
1074 assertNotNull(appendEntriesReply);
1076 assertEquals(2, appendEntriesReply.getLogLastIndex());
1077 assertEquals(1, appendEntriesReply.getLogLastTerm());
1083 public void testHandleAppendEntriesReplyFailure(){
1084 new JavaTestKit(getSystem()) {
1087 ActorRef leaderActor =
1088 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1090 ActorRef followerActor =
1091 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1094 MockRaftActorContext leaderActorContext =
1095 new MockRaftActorContext("leader", getSystem(), leaderActor);
1097 Map<String, String> peerAddresses = new HashMap<>();
1098 peerAddresses.put("follower-1",
1099 followerActor.path().toString());
1101 leaderActorContext.setPeerAddresses(peerAddresses);
1103 Leader leader = new Leader(leaderActorContext);
1105 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1107 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1109 assertEquals(RaftState.Leader, raftActorBehavior.state());
1115 public void testHandleAppendEntriesReplySuccess() throws Exception {
1116 new JavaTestKit(getSystem()) {
1119 ActorRef leaderActor =
1120 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1122 ActorRef followerActor =
1123 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1126 MockRaftActorContext leaderActorContext =
1127 new MockRaftActorContext("leader", getSystem(), leaderActor);
1129 leaderActorContext.setReplicatedLog(
1130 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1132 Map<String, String> peerAddresses = new HashMap<>();
1133 peerAddresses.put("follower-1",
1134 followerActor.path().toString());
1136 leaderActorContext.setPeerAddresses(peerAddresses);
1137 leaderActorContext.setCommitIndex(1);
1138 leaderActorContext.setLastApplied(1);
1139 leaderActorContext.getTermInformation().update(1, "leader");
1141 Leader leader = new Leader(leaderActorContext);
1143 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1145 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1147 assertEquals(RaftState.Leader, raftActorBehavior.state());
1149 assertEquals(2, leaderActorContext.getCommitIndex());
1151 ApplyLogEntries applyLogEntries =
1152 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1153 ApplyLogEntries.class);
1155 assertNotNull(applyLogEntries);
1157 assertEquals(2, leaderActorContext.getLastApplied());
1159 assertEquals(2, applyLogEntries.getToIndex());
1161 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1164 assertEquals(1,applyStateList.size());
1166 ApplyState applyState = (ApplyState) applyStateList.get(0);
1168 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1174 public void testHandleAppendEntriesReplyUnknownFollower(){
1175 new JavaTestKit(getSystem()) {
1178 ActorRef leaderActor =
1179 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1181 MockRaftActorContext leaderActorContext =
1182 new MockRaftActorContext("leader", getSystem(), leaderActor);
1184 Leader leader = new Leader(leaderActorContext);
1186 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1188 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1190 assertEquals(RaftState.Leader, raftActorBehavior.state());
1196 public void testHandleRequestVoteReply(){
1197 new JavaTestKit(getSystem()) {
1200 ActorRef leaderActor =
1201 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1203 MockRaftActorContext leaderActorContext =
1204 new MockRaftActorContext("leader", getSystem(), leaderActor);
1206 Leader leader = new Leader(leaderActorContext);
1208 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1210 assertEquals(RaftState.Leader, raftActorBehavior.state());
1212 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1214 assertEquals(RaftState.Leader, raftActorBehavior.state());
1219 public void testIsolatedLeaderCheckNoFollowers() {
1220 new JavaTestKit(getSystem()) {{
1221 ActorRef leaderActor = getTestActor();
1223 MockRaftActorContext leaderActorContext =
1224 new MockRaftActorContext("leader", getSystem(), leaderActor);
1226 Map<String, String> peerAddresses = new HashMap<>();
1227 leaderActorContext.setPeerAddresses(peerAddresses);
1229 Leader leader = new Leader(leaderActorContext);
1230 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1231 Assert.assertTrue(behavior instanceof Leader);
1236 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1237 new JavaTestKit(getSystem()) {{
1239 ActorRef followerActor1 = getTestActor();
1240 ActorRef followerActor2 = getTestActor();
1242 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1244 Map<String, String> peerAddresses = new HashMap<>();
1245 peerAddresses.put("follower-1", followerActor1.path().toString());
1246 peerAddresses.put("follower-2", followerActor2.path().toString());
1248 leaderActorContext.setPeerAddresses(peerAddresses);
1250 Leader leader = new Leader(leaderActorContext);
1251 leader.stopIsolatedLeaderCheckSchedule();
1253 leader.markFollowerActive("follower-1");
1254 leader.markFollowerActive("follower-2");
1255 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1256 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1257 behavior instanceof Leader);
1259 // kill 1 follower and verify if that got killed
1260 final JavaTestKit probe = new JavaTestKit(getSystem());
1261 probe.watch(followerActor1);
1262 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1263 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1264 assertEquals(termMsg1.getActor(), followerActor1);
1266 leader.markFollowerInActive("follower-1");
1267 leader.markFollowerActive("follower-2");
1268 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1269 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1270 behavior instanceof Leader);
1272 // kill 2nd follower and leader should change to Isolated leader
1273 followerActor2.tell(PoisonPill.getInstance(), null);
1274 probe.watch(followerActor2);
1275 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1276 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1277 assertEquals(termMsg2.getActor(), followerActor2);
1279 leader.markFollowerInActive("follower-2");
1280 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1281 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1282 behavior instanceof IsolatedLeader);
1289 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1290 new JavaTestKit(getSystem()) {{
1292 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
1294 MockRaftActorContext leaderActorContext =
1295 new MockRaftActorContext("leader", getSystem(), leaderActor);
1297 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1298 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1299 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1301 leaderActorContext.setConfigParams(configParams);
1303 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
1305 MockRaftActorContext followerActorContext =
1306 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1308 followerActorContext.setConfigParams(configParams);
1310 Follower follower = new Follower(followerActorContext);
1312 ForwardMessageToBehaviorActor.setBehavior(follower);
1314 Map<String, String> peerAddresses = new HashMap<>();
1315 peerAddresses.put("follower-reply",
1316 followerActor.path().toString());
1318 leaderActorContext.setPeerAddresses(peerAddresses);
1320 leaderActorContext.getReplicatedLog().removeFrom(0);
1323 leaderActorContext.setReplicatedLog(
1324 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1326 leaderActorContext.setCommitIndex(1);
1328 Leader leader = new Leader(leaderActorContext);
1329 leader.markFollowerActive("follower-reply");
1331 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1332 TimeUnit.MILLISECONDS);
1334 leader.handleMessage(leaderActor, new SendHeartBeat());
1336 AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
1337 .getFirstMatching(followerActor, AppendEntries.class);
1339 assertNotNull(appendEntries);
1341 assertEquals(1, appendEntries.getLeaderCommit());
1342 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1343 assertEquals(0, appendEntries.getPrevLogIndex());
1345 AppendEntriesReply appendEntriesReply =
1346 (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1348 assertNotNull(appendEntriesReply);
1350 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1352 List<Object> entries = ForwardMessageToBehaviorActor
1353 .getAllMatching(followerActor, AppendEntries.class);
1355 assertEquals("AppendEntries count should be 2 ", 2, entries.size());
1357 AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
1359 assertEquals(1, appendEntriesSecond.getLeaderCommit());
1360 assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
1361 assertEquals(1, appendEntriesSecond.getPrevLogIndex());
1366 class MockLeader extends Leader {
1368 FollowerToSnapshot fts;
1370 public MockLeader(RaftActorContext context){
1374 public FollowerToSnapshot getFollowerToSnapshot() {
1378 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1379 fts = new FollowerToSnapshot(bs);
1380 setFollowerSnapshot(followerId, fts);
1384 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1386 private final long electionTimeOutIntervalMillis;
1387 private final int snapshotChunkSize;
1389 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1391 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1392 this.snapshotChunkSize = snapshotChunkSize;
1396 public FiniteDuration getElectionTimeOutInterval() {
1397 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1401 public int getSnapshotChunkSize() {
1402 return snapshotChunkSize;