1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
43 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50 private final ActorRef leaderActor =
51 getSystem().actorOf(Props.create(DoNothingActor.class));
52 private final ActorRef senderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
56 public void testHandleMessageForUnknownMessage() throws Exception {
57 new JavaTestKit(getSystem()) {{
59 new Leader(createActorContext());
61 // handle message should return the Leader state when it receives an
63 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
64 Assert.assertTrue(behavior instanceof Leader);
69 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
70 new JavaTestKit(getSystem()) {{
71 new Within(duration("1 seconds")) {
73 protected void run() {
74 ActorRef followerActor = getTestActor();
76 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
78 Map<String, String> peerAddresses = new HashMap<>();
80 String followerId = "follower";
81 peerAddresses.put(followerId, followerActor.path().toString());
83 actorContext.setPeerAddresses(peerAddresses);
86 actorContext.getTermInformation().update(term, "");
88 Leader leader = new Leader(actorContext);
90 // Leader should send an immediate heartbeat with no entries as follower is inactive.
91 long lastIndex = actorContext.getReplicatedLog().lastIndex();
92 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
93 assertEquals("getTerm", term, appendEntries.getTerm());
94 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
95 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
96 assertEquals("Entries size", 0, appendEntries.getEntries().size());
98 // The follower would normally reply - simulate that explicitly here.
99 leader.handleMessage(followerActor, new AppendEntriesReply(
100 followerId, term, true, lastIndex - 1, term));
101 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
103 // Sleep for the heartbeat interval so AppendEntries is sent.
104 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
105 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
107 leader.handleMessage(senderActor, new SendHeartBeat());
109 appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
110 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
111 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
112 assertEquals("Entries size", 1, appendEntries.getEntries().size());
113 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
114 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
122 new JavaTestKit(getSystem()) {{
123 new Within(duration("1 seconds")) {
125 protected void run() {
126 ActorRef followerActor = getTestActor();
128 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
130 Map<String, String> peerAddresses = new HashMap<>();
132 String followerId = "follower";
133 peerAddresses.put(followerId, followerActor.path().toString());
135 actorContext.setPeerAddresses(peerAddresses);
138 actorContext.getTermInformation().update(term, "");
140 Leader leader = new Leader(actorContext);
142 // Leader will send an immediate heartbeat - ignore it.
143 expectMsgClass(duration("5 seconds"), AppendEntries.class);
145 // The follower would normally reply - simulate that explicitly here.
146 long lastIndex = actorContext.getReplicatedLog().lastIndex();
147 leader.handleMessage(followerActor, new AppendEntriesReply(
148 followerId, term, true, lastIndex, term));
149 assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
151 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
152 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
153 1, lastIndex + 1, payload);
154 actorContext.getReplicatedLog().append(newEntry);
155 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
156 new Replicate(null, null, newEntry));
158 // State should not change
159 assertTrue(raftBehavior instanceof Leader);
161 AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
162 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
163 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
164 assertEquals("Entries size", 1, appendEntries.getEntries().size());
165 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
166 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
167 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
174 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
175 new JavaTestKit(getSystem()) {{
176 new Within(duration("1 seconds")) {
178 protected void run() {
180 ActorRef raftActor = getTestActor();
182 MockRaftActorContext actorContext =
183 new MockRaftActorContext("test", getSystem(), raftActor);
185 actorContext.getReplicatedLog().removeFrom(0);
187 actorContext.setReplicatedLog(
188 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
191 Leader leader = new Leader(actorContext);
192 RaftActorBehavior raftBehavior = leader
193 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
195 // State should not change
196 assertTrue(raftBehavior instanceof Leader);
198 assertEquals(1, actorContext.getCommitIndex());
201 new ExpectMsg<String>(duration("1 seconds"),
203 // do not put code outside this method, will run afterwards
205 protected String match(Object in) {
206 if (in instanceof ApplyState) {
207 if (((ApplyState) in).getIdentifier().equals("state-id")) {
215 }.get(); // this extracts the received message
217 assertEquals("match", out);
225 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
226 new JavaTestKit(getSystem()) {{
227 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
229 Map<String, String> peerAddresses = new HashMap<>();
230 peerAddresses.put(followerActor.path().toString(),
231 followerActor.path().toString());
233 MockRaftActorContext actorContext =
234 (MockRaftActorContext) createActorContext(leaderActor);
235 actorContext.setPeerAddresses(peerAddresses);
237 Map<String, String> leadersSnapshot = new HashMap<>();
238 leadersSnapshot.put("1", "A");
239 leadersSnapshot.put("2", "B");
240 leadersSnapshot.put("3", "C");
243 actorContext.getReplicatedLog().removeFrom(0);
245 final int followersLastIndex = 2;
246 final int snapshotIndex = 3;
247 final int newEntryIndex = 4;
248 final int snapshotTerm = 1;
249 final int currentTerm = 2;
251 // set the snapshot variables in replicatedlog
252 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
253 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
254 actorContext.setCommitIndex(followersLastIndex);
255 //set follower timeout to 2 mins, helps during debugging
256 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
258 MockLeader leader = new MockLeader(actorContext);
261 ReplicatedLogImplEntry entry =
262 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
263 new MockRaftActorContext.MockPayload("D"));
265 //update follower timestamp
266 leader.markFollowerActive(followerActor.path().toString());
268 ByteString bs = toByteString(leadersSnapshot);
269 leader.setSnapshot(Optional.of(bs));
270 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
272 //send first chunk and no InstallSnapshotReply received yet
273 leader.getFollowerToSnapshot().getNextChunk();
274 leader.getFollowerToSnapshot().incrementChunkIndex();
276 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
277 TimeUnit.MILLISECONDS);
279 leader.handleMessage(leaderActor, new SendHeartBeat());
281 AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
282 followerActor, AppendEntries.class);
284 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
285 "received", aeproto);
287 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
289 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
291 //InstallSnapshotReply received
292 leader.getFollowerToSnapshot().markSendStatus(true);
294 leader.handleMessage(senderActor, new SendHeartBeat());
296 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
297 InstallSnapshot.SERIALIZABLE_CLASS);
299 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
302 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
304 assertEquals(snapshotIndex, is.getLastIncludedIndex());
310 public void testSendAppendEntriesSnapshotScenario() {
311 new JavaTestKit(getSystem()) {{
313 ActorRef followerActor = getTestActor();
315 Map<String, String> peerAddresses = new HashMap<>();
316 peerAddresses.put(followerActor.path().toString(),
317 followerActor.path().toString());
319 MockRaftActorContext actorContext =
320 (MockRaftActorContext) createActorContext(getRef());
321 actorContext.setPeerAddresses(peerAddresses);
323 Map<String, String> leadersSnapshot = new HashMap<>();
324 leadersSnapshot.put("1", "A");
325 leadersSnapshot.put("2", "B");
326 leadersSnapshot.put("3", "C");
329 actorContext.getReplicatedLog().removeFrom(0);
331 final int followersLastIndex = 2;
332 final int snapshotIndex = 3;
333 final int newEntryIndex = 4;
334 final int snapshotTerm = 1;
335 final int currentTerm = 2;
337 // set the snapshot variables in replicatedlog
338 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
339 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
340 actorContext.setCommitIndex(followersLastIndex);
342 Leader leader = new Leader(actorContext);
345 ReplicatedLogImplEntry entry =
346 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
347 new MockRaftActorContext.MockPayload("D"));
349 //update follower timestamp
350 leader.markFollowerActive(followerActor.path().toString());
352 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
353 TimeUnit.MILLISECONDS);
355 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
356 RaftActorBehavior raftBehavior = leader.handleMessage(
357 senderActor, new Replicate(null, "state-id", entry));
359 assertTrue(raftBehavior instanceof Leader);
361 // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
362 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
364 protected Boolean match(Object o) throws Exception {
365 if (o instanceof CaptureSnapshot) {
372 boolean captureSnapshot = false;
373 for (Boolean b: matches) {
374 captureSnapshot = b | captureSnapshot;
377 assertTrue(captureSnapshot);
382 public void testInitiateInstallSnapshot() throws Exception {
383 new JavaTestKit(getSystem()) {{
385 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
387 ActorRef followerActor = getTestActor();
389 Map<String, String> peerAddresses = new HashMap<>();
390 peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
392 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
393 actorContext.setPeerAddresses(peerAddresses);
395 Map<String, String> leadersSnapshot = new HashMap<>();
396 leadersSnapshot.put("1", "A");
397 leadersSnapshot.put("2", "B");
398 leadersSnapshot.put("3", "C");
401 actorContext.getReplicatedLog().removeFrom(0);
403 final int followersLastIndex = 2;
404 final int snapshotIndex = 3;
405 final int newEntryIndex = 4;
406 final int snapshotTerm = 1;
407 final int currentTerm = 2;
409 // set the snapshot variables in replicatedlog
410 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
411 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
412 actorContext.setLastApplied(3);
413 actorContext.setCommitIndex(followersLastIndex);
415 Leader leader = new Leader(actorContext);
416 // set the snapshot as absent and check if capture-snapshot is invoked.
417 leader.setSnapshot(Optional.<ByteString>absent());
420 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
421 new MockRaftActorContext.MockPayload("D"));
423 actorContext.getReplicatedLog().append(entry);
425 //update follower timestamp
426 leader.markFollowerActive(followerActor.path().toString());
428 RaftActorBehavior raftBehavior = leader.handleMessage(
429 senderActor, new Replicate(null, "state-id", entry));
431 CaptureSnapshot cs = MessageCollectorActor.
432 getFirstMatching(leaderActor, CaptureSnapshot.class);
436 assertTrue(cs.isInstallSnapshotInitiated());
437 assertEquals(3, cs.getLastAppliedIndex());
438 assertEquals(1, cs.getLastAppliedTerm());
439 assertEquals(4, cs.getLastIndex());
440 assertEquals(2, cs.getLastTerm());
442 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
443 leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
444 List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
445 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
451 public void testInstallSnapshot() {
452 new JavaTestKit(getSystem()) {{
454 ActorRef followerActor = getTestActor();
456 Map<String, String> peerAddresses = new HashMap<>();
457 peerAddresses.put(followerActor.path().toString(),
458 followerActor.path().toString());
460 MockRaftActorContext actorContext =
461 (MockRaftActorContext) createActorContext();
462 actorContext.setPeerAddresses(peerAddresses);
465 Map<String, String> leadersSnapshot = new HashMap<>();
466 leadersSnapshot.put("1", "A");
467 leadersSnapshot.put("2", "B");
468 leadersSnapshot.put("3", "C");
471 actorContext.getReplicatedLog().removeFrom(0);
473 final int followersLastIndex = 2;
474 final int snapshotIndex = 3;
475 final int newEntryIndex = 4;
476 final int snapshotTerm = 1;
477 final int currentTerm = 2;
479 // set the snapshot variables in replicatedlog
480 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
481 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
482 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
483 actorContext.setCommitIndex(followersLastIndex);
485 Leader leader = new Leader(actorContext);
487 // Ignore initial heartbeat.
488 expectMsgClass(duration("5 seconds"), AppendEntries.class);
491 ReplicatedLogImplEntry entry =
492 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
493 new MockRaftActorContext.MockPayload("D"));
495 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
496 new SendInstallSnapshot(toByteString(leadersSnapshot)));
498 assertTrue(raftBehavior instanceof Leader);
500 // check if installsnapshot gets called with the correct values.
502 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
503 // do not put code outside this method, will run afterwards
505 protected String match(Object in) {
506 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
507 InstallSnapshot is = (InstallSnapshot)
508 SerializationUtils.fromSerializable(in);
509 if (is.getData() == null) {
510 return "InstallSnapshot data is null";
512 if (is.getLastIncludedIndex() != snapshotIndex) {
513 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
515 if (is.getLastIncludedTerm() != snapshotTerm) {
516 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
518 if (is.getTerm() == currentTerm) {
519 return is.getTerm() + "!=" + currentTerm;
525 return "message mismatch:" + in.getClass();
528 }.get(); // this extracts the received message
530 assertEquals("match", out);
535 public void testHandleInstallSnapshotReplyLastChunk() {
536 new JavaTestKit(getSystem()) {{
538 ActorRef followerActor = getTestActor();
540 Map<String, String> peerAddresses = new HashMap<>();
541 peerAddresses.put(followerActor.path().toString(),
542 followerActor.path().toString());
544 final int followersLastIndex = 2;
545 final int snapshotIndex = 3;
546 final int newEntryIndex = 4;
547 final int snapshotTerm = 1;
548 final int currentTerm = 2;
550 MockRaftActorContext actorContext =
551 (MockRaftActorContext) createActorContext();
552 actorContext.setPeerAddresses(peerAddresses);
553 actorContext.setCommitIndex(followersLastIndex);
555 MockLeader leader = new MockLeader(actorContext);
557 // Ignore initial heartbeat.
558 expectMsgClass(duration("5 seconds"), AppendEntries.class);
560 Map<String, String> leadersSnapshot = new HashMap<>();
561 leadersSnapshot.put("1", "A");
562 leadersSnapshot.put("2", "B");
563 leadersSnapshot.put("3", "C");
565 // set the snapshot variables in replicatedlog
567 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
571 ByteString bs = toByteString(leadersSnapshot);
572 leader.setSnapshot(Optional.of(bs));
573 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
574 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
575 leader.getFollowerToSnapshot().getNextChunk();
576 leader.getFollowerToSnapshot().incrementChunkIndex();
580 actorContext.getReplicatedLog().removeFrom(0);
582 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
583 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
584 leader.getFollowerToSnapshot().getChunkIndex(), true));
586 assertTrue(raftBehavior instanceof Leader);
588 assertEquals(0, leader.followerSnapshotSize());
589 assertEquals(1, leader.followerLogSize());
590 assertNotNull(leader.getFollower(followerActor.path().toString()));
591 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
592 assertEquals(snapshotIndex, fli.getMatchIndex());
593 assertEquals(snapshotIndex, fli.getMatchIndex());
594 assertEquals(snapshotIndex + 1, fli.getNextIndex());
598 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
599 new JavaTestKit(getSystem()) {{
601 TestActorRef<MessageCollectorActor> followerActor =
602 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
604 Map<String, String> peerAddresses = new HashMap<>();
605 peerAddresses.put("follower-reply",
606 followerActor.path().toString());
608 final int followersLastIndex = 2;
609 final int snapshotIndex = 3;
610 final int snapshotTerm = 1;
611 final int currentTerm = 2;
613 MockRaftActorContext actorContext =
614 (MockRaftActorContext) createActorContext();
615 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
617 public int getSnapshotChunkSize() {
621 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
622 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
624 actorContext.setConfigParams(configParams);
625 actorContext.setPeerAddresses(peerAddresses);
626 actorContext.setCommitIndex(followersLastIndex);
628 MockLeader leader = new MockLeader(actorContext);
630 Map<String, String> leadersSnapshot = new HashMap<>();
631 leadersSnapshot.put("1", "A");
632 leadersSnapshot.put("2", "B");
633 leadersSnapshot.put("3", "C");
635 // set the snapshot variables in replicatedlog
636 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
640 ByteString bs = toByteString(leadersSnapshot);
641 leader.setSnapshot(Optional.of(bs));
643 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
645 List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
646 InstallSnapshotMessages.InstallSnapshot.class);
648 assertEquals(1, objectList.size());
650 Object o = objectList.get(0);
651 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
653 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
655 assertEquals(1, installSnapshot.getChunkIndex());
656 assertEquals(3, installSnapshot.getTotalChunks());
658 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
659 "follower-reply", installSnapshot.getChunkIndex(), true));
661 objectList = MessageCollectorActor.getAllMatching(followerActor,
662 InstallSnapshotMessages.InstallSnapshot.class);
664 assertEquals(2, objectList.size());
666 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
668 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
669 "follower-reply", installSnapshot.getChunkIndex(), true));
671 objectList = MessageCollectorActor.getAllMatching(followerActor,
672 InstallSnapshotMessages.InstallSnapshot.class);
674 assertEquals(3, objectList.size());
676 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
678 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
679 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
680 "follower-reply", installSnapshot.getChunkIndex(), true));
682 objectList = MessageCollectorActor.getAllMatching(followerActor,
683 InstallSnapshotMessages.InstallSnapshot.class);
685 // Count should still stay at 3
686 assertEquals(3, objectList.size());
692 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
693 new JavaTestKit(getSystem()) {{
695 TestActorRef<MessageCollectorActor> followerActor =
696 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
698 Map<String, String> peerAddresses = new HashMap<>();
699 peerAddresses.put(followerActor.path().toString(),
700 followerActor.path().toString());
702 final int followersLastIndex = 2;
703 final int snapshotIndex = 3;
704 final int snapshotTerm = 1;
705 final int currentTerm = 2;
707 MockRaftActorContext actorContext =
708 (MockRaftActorContext) createActorContext();
710 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
712 public int getSnapshotChunkSize() {
716 actorContext.setPeerAddresses(peerAddresses);
717 actorContext.setCommitIndex(followersLastIndex);
719 MockLeader leader = new MockLeader(actorContext);
721 Map<String, String> leadersSnapshot = new HashMap<>();
722 leadersSnapshot.put("1", "A");
723 leadersSnapshot.put("2", "B");
724 leadersSnapshot.put("3", "C");
726 // set the snapshot variables in replicatedlog
727 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
728 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
729 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
731 ByteString bs = toByteString(leadersSnapshot);
732 leader.setSnapshot(Optional.of(bs));
734 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
736 MessageCollectorActor.getAllMatching(followerActor,
737 InstallSnapshotMessages.InstallSnapshot.class);
739 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
740 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
741 assertNotNull(installSnapshot);
743 assertEquals(1, installSnapshot.getChunkIndex());
744 assertEquals(3, installSnapshot.getTotalChunks());
746 followerActor.underlyingActor().clear();
748 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
749 followerActor.path().toString(), -1, false));
751 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
752 TimeUnit.MILLISECONDS);
754 leader.handleMessage(leaderActor, new SendHeartBeat());
756 installSnapshot = MessageCollectorActor.getFirstMatching(
757 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
758 assertNotNull(installSnapshot);
760 assertEquals(1, installSnapshot.getChunkIndex());
761 assertEquals(3, installSnapshot.getTotalChunks());
763 followerActor.tell(PoisonPill.getInstance(), getRef());
768 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
769 new JavaTestKit(getSystem()) {
771 TestActorRef<MessageCollectorActor> followerActor =
772 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
774 Map<String, String> peerAddresses = new HashMap<>();
775 peerAddresses.put(followerActor.path().toString(),
776 followerActor.path().toString());
778 final int followersLastIndex = 2;
779 final int snapshotIndex = 3;
780 final int snapshotTerm = 1;
781 final int currentTerm = 2;
783 MockRaftActorContext actorContext =
784 (MockRaftActorContext) createActorContext();
786 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
788 public int getSnapshotChunkSize() {
792 actorContext.setPeerAddresses(peerAddresses);
793 actorContext.setCommitIndex(followersLastIndex);
795 MockLeader leader = new MockLeader(actorContext);
797 Map<String, String> leadersSnapshot = new HashMap<>();
798 leadersSnapshot.put("1", "A");
799 leadersSnapshot.put("2", "B");
800 leadersSnapshot.put("3", "C");
802 // set the snapshot variables in replicatedlog
803 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
807 ByteString bs = toByteString(leadersSnapshot);
808 leader.setSnapshot(Optional.of(bs));
810 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
812 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
813 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
814 assertNotNull(installSnapshot);
816 assertEquals(1, installSnapshot.getChunkIndex());
817 assertEquals(3, installSnapshot.getTotalChunks());
818 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
820 int hashCode = installSnapshot.getData().hashCode();
822 followerActor.underlyingActor().clear();
824 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
826 installSnapshot = MessageCollectorActor.getFirstMatching(
827 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
828 assertNotNull(installSnapshot);
830 assertEquals(2, installSnapshot.getChunkIndex());
831 assertEquals(3, installSnapshot.getTotalChunks());
832 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
834 followerActor.tell(PoisonPill.getInstance(), getRef());
839 public void testFollowerToSnapshotLogic() {
841 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
843 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
845 public int getSnapshotChunkSize() {
850 MockLeader leader = new MockLeader(actorContext);
852 Map<String, String> leadersSnapshot = new HashMap<>();
853 leadersSnapshot.put("1", "A");
854 leadersSnapshot.put("2", "B");
855 leadersSnapshot.put("3", "C");
857 ByteString bs = toByteString(leadersSnapshot);
858 byte[] barray = bs.toByteArray();
860 leader.createFollowerToSnapshot("followerId", bs);
861 assertEquals(bs.size(), barray.length);
864 for (int i=0; i < barray.length; i = i + 50) {
868 if (i + 50 > barray.length) {
872 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
873 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
874 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
876 leader.getFollowerToSnapshot().markSendStatus(true);
877 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
878 leader.getFollowerToSnapshot().incrementChunkIndex();
882 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
886 @Override protected RaftActorBehavior createBehavior(
887 RaftActorContext actorContext) {
888 return new Leader(actorContext);
891 @Override protected RaftActorContext createActorContext() {
892 return createActorContext(leaderActor);
896 protected RaftActorContext createActorContext(ActorRef actorRef) {
897 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
898 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
899 configParams.setElectionTimeoutFactor(100000);
900 MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
901 context.setConfigParams(configParams);
905 private ByteString toByteString(Map<String, String> state) {
906 ByteArrayOutputStream b = null;
907 ObjectOutputStream o = null;
910 b = new ByteArrayOutputStream();
911 o = new ObjectOutputStream(b);
912 o.writeObject(state);
913 byte[] snapshotBytes = b.toByteArray();
914 return ByteString.copyFrom(snapshotBytes);
924 } catch (IOException e) {
925 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
930 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
931 AbstractRaftActorBehavior behavior;
933 @Override public void onReceive(Object message) throws Exception {
934 if(behavior != null) {
935 behavior.handleMessage(sender(), message);
938 super.onReceive(message);
941 public static Props props() {
942 return Props.create(ForwardMessageToBehaviorActor.class);
947 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
948 new JavaTestKit(getSystem()) {{
949 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
950 Props.create(ForwardMessageToBehaviorActor.class));
952 MockRaftActorContext leaderActorContext =
953 new MockRaftActorContext("leader", getSystem(), leaderActor);
955 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
956 ForwardMessageToBehaviorActor.props());
958 MockRaftActorContext followerActorContext =
959 new MockRaftActorContext("follower", getSystem(), followerActor);
961 Follower follower = new Follower(followerActorContext);
962 followerActor.underlyingActor().behavior = follower;
964 Map<String, String> peerAddresses = new HashMap<>();
965 peerAddresses.put("follower", followerActor.path().toString());
967 leaderActorContext.setPeerAddresses(peerAddresses);
969 leaderActorContext.getReplicatedLog().removeFrom(0);
972 leaderActorContext.setReplicatedLog(
973 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
975 leaderActorContext.setCommitIndex(1);
977 followerActorContext.getReplicatedLog().removeFrom(0);
979 // follower too has the exact same log entries and has the same commit index
980 followerActorContext.setReplicatedLog(
981 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
983 followerActorContext.setCommitIndex(1);
985 Leader leader = new Leader(leaderActorContext);
987 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
988 assertNotNull(appendEntries);
990 assertEquals(1, appendEntries.getLeaderCommit());
991 assertEquals(0, appendEntries.getEntries().size());
992 assertEquals(0, appendEntries.getPrevLogIndex());
994 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
995 leaderActor, AppendEntriesReply.class);
996 assertNotNull(appendEntriesReply);
998 assertEquals(2, appendEntriesReply.getLogLastIndex());
999 assertEquals(1, appendEntriesReply.getLogLastTerm());
1001 // follower returns its next index
1002 assertEquals(2, appendEntriesReply.getLogLastIndex());
1003 assertEquals(1, appendEntriesReply.getLogLastTerm());
1009 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1010 new JavaTestKit(getSystem()) {{
1011 TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
1012 Props.create(ForwardMessageToBehaviorActor.class));
1014 MockRaftActorContext leaderActorContext =
1015 new MockRaftActorContext("leader", getSystem(), leaderActor);
1017 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1018 ForwardMessageToBehaviorActor.props());
1020 MockRaftActorContext followerActorContext =
1021 new MockRaftActorContext("follower", getSystem(), followerActor);
1023 Follower follower = new Follower(followerActorContext);
1024 followerActor.underlyingActor().behavior = follower;
1026 Map<String, String> peerAddresses = new HashMap<>();
1027 peerAddresses.put("follower", followerActor.path().toString());
1029 leaderActorContext.setPeerAddresses(peerAddresses);
1031 leaderActorContext.getReplicatedLog().removeFrom(0);
1033 leaderActorContext.setReplicatedLog(
1034 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1036 leaderActorContext.setCommitIndex(1);
1038 followerActorContext.getReplicatedLog().removeFrom(0);
1040 followerActorContext.setReplicatedLog(
1041 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1043 // follower has the same log entries but its commit index > leaders commit index
1044 followerActorContext.setCommitIndex(2);
1046 Leader leader = new Leader(leaderActorContext);
1048 // Initial heartbeat
1049 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1050 assertNotNull(appendEntries);
1052 assertEquals(1, appendEntries.getLeaderCommit());
1053 assertEquals(0, appendEntries.getEntries().size());
1054 assertEquals(0, appendEntries.getPrevLogIndex());
1056 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1057 leaderActor, AppendEntriesReply.class);
1058 assertNotNull(appendEntriesReply);
1060 assertEquals(2, appendEntriesReply.getLogLastIndex());
1061 assertEquals(1, appendEntriesReply.getLogLastTerm());
1063 leaderActor.underlyingActor().behavior = leader;
1064 leader.handleMessage(followerActor, appendEntriesReply);
1066 leaderActor.underlyingActor().clear();
1067 followerActor.underlyingActor().clear();
1069 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1070 TimeUnit.MILLISECONDS);
1072 leader.handleMessage(leaderActor, new SendHeartBeat());
1074 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1075 assertNotNull(appendEntries);
1077 assertEquals(1, appendEntries.getLeaderCommit());
1078 assertEquals(0, appendEntries.getEntries().size());
1079 assertEquals(2, appendEntries.getPrevLogIndex());
1081 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1082 assertNotNull(appendEntriesReply);
1084 assertEquals(2, appendEntriesReply.getLogLastIndex());
1085 assertEquals(1, appendEntriesReply.getLogLastTerm());
1087 assertEquals(1, followerActorContext.getCommitIndex());
1092 public void testHandleAppendEntriesReplyFailure(){
1093 new JavaTestKit(getSystem()) {
1096 ActorRef leaderActor =
1097 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1099 ActorRef followerActor =
1100 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1103 MockRaftActorContext leaderActorContext =
1104 new MockRaftActorContext("leader", getSystem(), leaderActor);
1106 Map<String, String> peerAddresses = new HashMap<>();
1107 peerAddresses.put("follower-1",
1108 followerActor.path().toString());
1110 leaderActorContext.setPeerAddresses(peerAddresses);
1112 Leader leader = new Leader(leaderActorContext);
1114 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1116 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1118 assertEquals(RaftState.Leader, raftActorBehavior.state());
1124 public void testHandleAppendEntriesReplySuccess() throws Exception {
1125 new JavaTestKit(getSystem()) {
1128 ActorRef leaderActor =
1129 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1131 ActorRef followerActor =
1132 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1135 MockRaftActorContext leaderActorContext =
1136 new MockRaftActorContext("leader", getSystem(), leaderActor);
1138 leaderActorContext.setReplicatedLog(
1139 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1141 Map<String, String> peerAddresses = new HashMap<>();
1142 peerAddresses.put("follower-1",
1143 followerActor.path().toString());
1145 leaderActorContext.setPeerAddresses(peerAddresses);
1146 leaderActorContext.setCommitIndex(1);
1147 leaderActorContext.setLastApplied(1);
1148 leaderActorContext.getTermInformation().update(1, "leader");
1150 Leader leader = new Leader(leaderActorContext);
1152 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1154 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1156 assertEquals(RaftState.Leader, raftActorBehavior.state());
1158 assertEquals(2, leaderActorContext.getCommitIndex());
1160 ApplyLogEntries applyLogEntries =
1161 MessageCollectorActor.getFirstMatching(leaderActor,
1162 ApplyLogEntries.class);
1164 assertNotNull(applyLogEntries);
1166 assertEquals(2, leaderActorContext.getLastApplied());
1168 assertEquals(2, applyLogEntries.getToIndex());
1170 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1173 assertEquals(1,applyStateList.size());
1175 ApplyState applyState = (ApplyState) applyStateList.get(0);
1177 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1183 public void testHandleAppendEntriesReplyUnknownFollower(){
1184 new JavaTestKit(getSystem()) {
1187 ActorRef leaderActor =
1188 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1190 MockRaftActorContext leaderActorContext =
1191 new MockRaftActorContext("leader", getSystem(), leaderActor);
1193 Leader leader = new Leader(leaderActorContext);
1195 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1197 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1199 assertEquals(RaftState.Leader, raftActorBehavior.state());
1205 public void testHandleRequestVoteReply(){
1206 new JavaTestKit(getSystem()) {
1209 ActorRef leaderActor =
1210 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1212 MockRaftActorContext leaderActorContext =
1213 new MockRaftActorContext("leader", getSystem(), leaderActor);
1215 Leader leader = new Leader(leaderActorContext);
1217 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1219 assertEquals(RaftState.Leader, raftActorBehavior.state());
1221 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1223 assertEquals(RaftState.Leader, raftActorBehavior.state());
1228 public void testIsolatedLeaderCheckNoFollowers() {
1229 new JavaTestKit(getSystem()) {{
1230 ActorRef leaderActor = getTestActor();
1232 MockRaftActorContext leaderActorContext =
1233 new MockRaftActorContext("leader", getSystem(), leaderActor);
1235 Map<String, String> peerAddresses = new HashMap<>();
1236 leaderActorContext.setPeerAddresses(peerAddresses);
1238 Leader leader = new Leader(leaderActorContext);
1239 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1240 Assert.assertTrue(behavior instanceof Leader);
1245 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1246 new JavaTestKit(getSystem()) {{
1248 ActorRef followerActor1 = getTestActor();
1249 ActorRef followerActor2 = getTestActor();
1251 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1253 Map<String, String> peerAddresses = new HashMap<>();
1254 peerAddresses.put("follower-1", followerActor1.path().toString());
1255 peerAddresses.put("follower-2", followerActor2.path().toString());
1257 leaderActorContext.setPeerAddresses(peerAddresses);
1259 Leader leader = new Leader(leaderActorContext);
1260 leader.stopIsolatedLeaderCheckSchedule();
1262 leader.markFollowerActive("follower-1");
1263 leader.markFollowerActive("follower-2");
1264 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1265 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1266 behavior instanceof Leader);
1268 // kill 1 follower and verify if that got killed
1269 final JavaTestKit probe = new JavaTestKit(getSystem());
1270 probe.watch(followerActor1);
1271 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1272 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1273 assertEquals(termMsg1.getActor(), followerActor1);
1275 leader.markFollowerInActive("follower-1");
1276 leader.markFollowerActive("follower-2");
1277 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1278 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1279 behavior instanceof Leader);
1281 // kill 2nd follower and leader should change to Isolated leader
1282 followerActor2.tell(PoisonPill.getInstance(), null);
1283 probe.watch(followerActor2);
1284 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1286 assertEquals(termMsg2.getActor(), followerActor2);
1288 leader.markFollowerInActive("follower-2");
1289 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1290 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1291 behavior instanceof IsolatedLeader);
1298 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1299 new JavaTestKit(getSystem()) {{
1300 TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
1301 Props.create(MessageCollectorActor.class));
1303 MockRaftActorContext leaderActorContext =
1304 new MockRaftActorContext("leader", getSystem(), leaderActor);
1306 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1307 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1308 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1310 leaderActorContext.setConfigParams(configParams);
1312 TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
1313 ForwardMessageToBehaviorActor.props());
1315 MockRaftActorContext followerActorContext =
1316 new MockRaftActorContext("follower-reply", getSystem(), followerActor);
1318 followerActorContext.setConfigParams(configParams);
1320 Follower follower = new Follower(followerActorContext);
1321 followerActor.underlyingActor().behavior = follower;
1323 Map<String, String> peerAddresses = new HashMap<>();
1324 peerAddresses.put("follower-reply",
1325 followerActor.path().toString());
1327 leaderActorContext.setPeerAddresses(peerAddresses);
1329 leaderActorContext.getReplicatedLog().removeFrom(0);
1330 leaderActorContext.setCommitIndex(-1);
1331 leaderActorContext.setLastApplied(-1);
1333 followerActorContext.getReplicatedLog().removeFrom(0);
1334 followerActorContext.setCommitIndex(-1);
1335 followerActorContext.setLastApplied(-1);
1337 Leader leader = new Leader(leaderActorContext);
1339 AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
1340 leaderActor, AppendEntriesReply.class);
1341 assertNotNull(appendEntriesReply);
1342 System.out.println("appendEntriesReply: "+appendEntriesReply);
1343 leader.handleMessage(followerActor, appendEntriesReply);
1345 // Clear initial heartbeat messages
1347 leaderActor.underlyingActor().clear();
1348 followerActor.underlyingActor().clear();
1351 leaderActorContext.setReplicatedLog(
1352 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1353 leaderActorContext.setCommitIndex(1);
1354 leaderActorContext.setLastApplied(1);
1356 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1357 TimeUnit.MILLISECONDS);
1359 leader.handleMessage(leaderActor, new SendHeartBeat());
1361 AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1362 assertNotNull(appendEntries);
1364 // Should send first log entry
1365 assertEquals(1, appendEntries.getLeaderCommit());
1366 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1367 assertEquals(-1, appendEntries.getPrevLogIndex());
1369 appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
1370 assertNotNull(appendEntriesReply);
1372 assertEquals(1, appendEntriesReply.getLogLastTerm());
1373 assertEquals(0, appendEntriesReply.getLogLastIndex());
1375 followerActor.underlyingActor().clear();
1377 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1379 appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
1380 assertNotNull(appendEntries);
1382 // Should send second log entry
1383 assertEquals(1, appendEntries.getLeaderCommit());
1384 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1388 class MockLeader extends Leader {
1390 FollowerToSnapshot fts;
1392 public MockLeader(RaftActorContext context){
1396 public FollowerToSnapshot getFollowerToSnapshot() {
1400 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1401 fts = new FollowerToSnapshot(bs);
1402 setFollowerSnapshot(followerId, fts);
1406 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1408 private final long electionTimeOutIntervalMillis;
1409 private final int snapshotChunkSize;
1411 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1413 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1414 this.snapshotChunkSize = snapshotChunkSize;
1418 public FiniteDuration getElectionTimeOutInterval() {
1419 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1423 public int getSnapshotChunkSize() {
1424 return snapshotChunkSize;