1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.Snapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
45 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
46 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractLeaderTest {
52 static final String FOLLOWER_ID = "follower";
53 public static final String LEADER_ID = "leader";
55 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
56 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
58 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
59 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
61 private Leader leader;
62 private final short payloadVersion = 5;
66 public void tearDown() throws Exception {
75 public void testHandleMessageForUnknownMessage() throws Exception {
76 logStart("testHandleMessageForUnknownMessage");
78 leader = new Leader(createActorContext());
80 // handle message should return the Leader state when it receives an
82 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
83 Assert.assertTrue(behavior instanceof Leader);
87 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
88 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
90 MockRaftActorContext actorContext = createActorContextWithFollower();
91 short payloadVersion = (short)5;
92 actorContext.setPayloadVersion(payloadVersion);
95 actorContext.getTermInformation().update(term, "");
97 leader = new Leader(actorContext);
99 // Leader should send an immediate heartbeat with no entries as follower is inactive.
100 long lastIndex = actorContext.getReplicatedLog().lastIndex();
101 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
102 assertEquals("getTerm", term, appendEntries.getTerm());
103 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
104 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
105 assertEquals("Entries size", 0, appendEntries.getEntries().size());
106 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
108 // The follower would normally reply - simulate that explicitly here.
109 leader.handleMessage(followerActor, new AppendEntriesReply(
110 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
111 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
113 followerActor.underlyingActor().clear();
115 // Sleep for the heartbeat interval so AppendEntries is sent.
116 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
117 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
119 leader.handleMessage(leaderActor, new SendHeartBeat());
121 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
122 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 1, appendEntries.getEntries().size());
125 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
126 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
127 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
131 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
132 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
133 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
135 actorContext.getReplicatedLog().append(newEntry);
136 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
140 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
141 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
143 MockRaftActorContext actorContext = createActorContextWithFollower();
146 actorContext.getTermInformation().update(term, "");
148 leader = new Leader(actorContext);
150 // Leader will send an immediate heartbeat - ignore it.
151 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 // The follower would normally reply - simulate that explicitly here.
154 long lastIndex = actorContext.getReplicatedLog().lastIndex();
155 leader.handleMessage(followerActor, new AppendEntriesReply(
156 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
157 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
159 followerActor.underlyingActor().clear();
161 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
163 // State should not change
164 assertTrue(raftBehavior instanceof Leader);
166 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
167 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
168 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
169 assertEquals("Entries size", 1, appendEntries.getEntries().size());
170 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
171 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
172 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
176 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
177 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179 MockRaftActorContext actorContext = createActorContextWithFollower();
180 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
182 public FiniteDuration getHeartBeatInterval() {
183 return FiniteDuration.apply(5, TimeUnit.SECONDS);
188 actorContext.getTermInformation().update(term, "");
190 leader = new Leader(actorContext);
192 // Leader will send an immediate heartbeat - ignore it.
193 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
195 // The follower would normally reply - simulate that explicitly here.
196 long lastIndex = actorContext.getReplicatedLog().lastIndex();
197 leader.handleMessage(followerActor, new AppendEntriesReply(
198 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
199 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
201 followerActor.underlyingActor().clear();
203 for(int i=0;i<5;i++) {
204 sendReplicate(actorContext, lastIndex+i+1);
207 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
208 // We expect only 1 message to be sent because of two reasons,
209 // - an append entries reply was not received
210 // - the heartbeat interval has not expired
211 // In this scenario if multiple messages are sent they would likely be duplicates
212 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
216 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
217 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
219 MockRaftActorContext actorContext = createActorContextWithFollower();
220 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
222 public FiniteDuration getHeartBeatInterval() {
223 return FiniteDuration.apply(5, TimeUnit.SECONDS);
228 actorContext.getTermInformation().update(term, "");
230 leader = new Leader(actorContext);
232 // Leader will send an immediate heartbeat - ignore it.
233 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
235 // The follower would normally reply - simulate that explicitly here.
236 long lastIndex = actorContext.getReplicatedLog().lastIndex();
237 leader.handleMessage(followerActor, new AppendEntriesReply(
238 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
239 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
241 followerActor.underlyingActor().clear();
243 for(int i=0;i<3;i++) {
244 sendReplicate(actorContext, lastIndex+i+1);
245 leader.handleMessage(followerActor, new AppendEntriesReply(
246 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
250 for(int i=3;i<5;i++) {
251 sendReplicate(actorContext, lastIndex + i + 1);
254 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
255 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
256 // get sent to the follower - but not the 5th
257 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
259 for(int i=0;i<4;i++) {
260 long expected = allMessages.get(i).getEntries().get(0).getIndex();
261 assertEquals(expected, i+2);
266 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
267 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
269 MockRaftActorContext actorContext = createActorContextWithFollower();
270 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
272 public FiniteDuration getHeartBeatInterval() {
273 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
278 actorContext.getTermInformation().update(term, "");
280 leader = new Leader(actorContext);
282 // Leader will send an immediate heartbeat - ignore it.
283 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285 // The follower would normally reply - simulate that explicitly here.
286 long lastIndex = actorContext.getReplicatedLog().lastIndex();
287 leader.handleMessage(followerActor, new AppendEntriesReply(
288 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
289 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291 followerActor.underlyingActor().clear();
293 sendReplicate(actorContext, lastIndex+1);
295 // Wait slightly longer than heartbeat duration
296 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
298 leader.handleMessage(leaderActor, new SendHeartBeat());
300 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
301 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
303 assertEquals(1, allMessages.get(0).getEntries().size());
304 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
305 assertEquals(1, allMessages.get(1).getEntries().size());
306 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
311 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
312 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
314 MockRaftActorContext actorContext = createActorContextWithFollower();
315 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
317 public FiniteDuration getHeartBeatInterval() {
318 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
323 actorContext.getTermInformation().update(term, "");
325 leader = new Leader(actorContext);
327 // Leader will send an immediate heartbeat - ignore it.
328 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
330 // The follower would normally reply - simulate that explicitly here.
331 long lastIndex = actorContext.getReplicatedLog().lastIndex();
332 leader.handleMessage(followerActor, new AppendEntriesReply(
333 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
334 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
336 followerActor.underlyingActor().clear();
338 for(int i=0;i<3;i++) {
339 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
340 leader.handleMessage(leaderActor, new SendHeartBeat());
343 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
344 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
348 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
349 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
351 MockRaftActorContext actorContext = createActorContextWithFollower();
352 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
354 public FiniteDuration getHeartBeatInterval() {
355 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
360 actorContext.getTermInformation().update(term, "");
362 leader = new Leader(actorContext);
364 // Leader will send an immediate heartbeat - ignore it.
365 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
367 // The follower would normally reply - simulate that explicitly here.
368 long lastIndex = actorContext.getReplicatedLog().lastIndex();
369 leader.handleMessage(followerActor, new AppendEntriesReply(
370 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
371 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
373 followerActor.underlyingActor().clear();
375 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
376 leader.handleMessage(leaderActor, new SendHeartBeat());
377 sendReplicate(actorContext, lastIndex+1);
379 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
380 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
382 assertEquals(0, allMessages.get(0).getEntries().size());
383 assertEquals(1, allMessages.get(1).getEntries().size());
388 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
389 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
391 MockRaftActorContext actorContext = createActorContext();
393 leader = new Leader(actorContext);
395 actorContext.setLastApplied(0);
397 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
398 long term = actorContext.getTermInformation().getCurrentTerm();
399 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
400 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
402 actorContext.getReplicatedLog().append(newEntry);
404 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
405 new Replicate(leaderActor, "state-id", newEntry));
407 // State should not change
408 assertTrue(raftBehavior instanceof Leader);
410 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
412 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
413 // one since lastApplied state is 0.
414 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
415 leaderActor, ApplyState.class);
416 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
418 for(int i = 0; i <= newLogIndex - 1; i++ ) {
419 ApplyState applyState = applyStateList.get(i);
420 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
421 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
424 ApplyState last = applyStateList.get((int) newLogIndex - 1);
425 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
426 assertEquals("getIdentifier", "state-id", last.getIdentifier());
430 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
431 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
433 MockRaftActorContext actorContext = createActorContextWithFollower();
435 Map<String, String> leadersSnapshot = new HashMap<>();
436 leadersSnapshot.put("1", "A");
437 leadersSnapshot.put("2", "B");
438 leadersSnapshot.put("3", "C");
441 actorContext.getReplicatedLog().removeFrom(0);
443 final int commitIndex = 3;
444 final int snapshotIndex = 2;
445 final int newEntryIndex = 4;
446 final int snapshotTerm = 1;
447 final int currentTerm = 2;
449 // set the snapshot variables in replicatedlog
450 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
451 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
452 actorContext.setCommitIndex(commitIndex);
453 //set follower timeout to 2 mins, helps during debugging
454 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
456 leader = new Leader(actorContext);
458 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
459 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
462 ReplicatedLogImplEntry entry =
463 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
464 new MockRaftActorContext.MockPayload("D"));
466 //update follower timestamp
467 leader.markFollowerActive(FOLLOWER_ID);
469 ByteString bs = toByteString(leadersSnapshot);
470 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
471 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
472 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
473 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
475 //send first chunk and no InstallSnapshotReply received yet
477 fts.incrementChunkIndex();
479 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
480 TimeUnit.MILLISECONDS);
482 leader.handleMessage(leaderActor, new SendHeartBeat());
484 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
486 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
488 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
490 //InstallSnapshotReply received
491 fts.markSendStatus(true);
493 leader.handleMessage(leaderActor, new SendHeartBeat());
495 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
497 assertEquals(commitIndex, is.getLastIncludedIndex());
501 public void testSendAppendEntriesSnapshotScenario() throws Exception {
502 logStart("testSendAppendEntriesSnapshotScenario");
504 MockRaftActorContext actorContext = createActorContextWithFollower();
506 Map<String, String> leadersSnapshot = new HashMap<>();
507 leadersSnapshot.put("1", "A");
508 leadersSnapshot.put("2", "B");
509 leadersSnapshot.put("3", "C");
512 actorContext.getReplicatedLog().removeFrom(0);
514 final int followersLastIndex = 2;
515 final int snapshotIndex = 3;
516 final int newEntryIndex = 4;
517 final int snapshotTerm = 1;
518 final int currentTerm = 2;
520 // set the snapshot variables in replicatedlog
521 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
522 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
523 actorContext.setCommitIndex(followersLastIndex);
525 leader = new Leader(actorContext);
527 // Leader will send an immediate heartbeat - ignore it.
528 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
531 ReplicatedLogImplEntry entry =
532 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
533 new MockRaftActorContext.MockPayload("D"));
535 actorContext.getReplicatedLog().append(entry);
537 //update follower timestamp
538 leader.markFollowerActive(FOLLOWER_ID);
540 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
541 RaftActorBehavior raftBehavior = leader.handleMessage(
542 leaderActor, new Replicate(null, "state-id", entry));
544 assertTrue(raftBehavior instanceof Leader);
546 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
550 public void testInitiateInstallSnapshot() throws Exception {
551 logStart("testInitiateInstallSnapshot");
553 MockRaftActorContext actorContext = createActorContextWithFollower();
555 Map<String, String> leadersSnapshot = new HashMap<>();
556 leadersSnapshot.put("1", "A");
557 leadersSnapshot.put("2", "B");
558 leadersSnapshot.put("3", "C");
561 actorContext.getReplicatedLog().removeFrom(0);
563 final int followersLastIndex = 2;
564 final int snapshotIndex = 3;
565 final int newEntryIndex = 4;
566 final int snapshotTerm = 1;
567 final int currentTerm = 2;
569 // set the snapshot variables in replicatedlog
570 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572 actorContext.setLastApplied(3);
573 actorContext.setCommitIndex(followersLastIndex);
575 leader = new Leader(actorContext);
577 // Leader will send an immediate heartbeat - ignore it.
578 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
580 // set the snapshot as absent and check if capture-snapshot is invoked.
581 leader.setSnapshot(null);
584 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
585 new MockRaftActorContext.MockPayload("D"));
587 actorContext.getReplicatedLog().append(entry);
589 //update follower timestamp
590 leader.markFollowerActive(FOLLOWER_ID);
592 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
594 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
596 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
598 assertTrue(cs.isInstallSnapshotInitiated());
599 assertEquals(3, cs.getLastAppliedIndex());
600 assertEquals(1, cs.getLastAppliedTerm());
601 assertEquals(4, cs.getLastIndex());
602 assertEquals(2, cs.getLastTerm());
604 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
605 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
607 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
611 public void testInstallSnapshot() throws Exception {
612 logStart("testInstallSnapshot");
614 MockRaftActorContext actorContext = createActorContextWithFollower();
616 Map<String, String> leadersSnapshot = new HashMap<>();
617 leadersSnapshot.put("1", "A");
618 leadersSnapshot.put("2", "B");
619 leadersSnapshot.put("3", "C");
622 actorContext.getReplicatedLog().removeFrom(0);
624 final int lastAppliedIndex = 3;
625 final int snapshotIndex = 2;
626 final int snapshotTerm = 1;
627 final int currentTerm = 2;
629 // set the snapshot variables in replicatedlog
630 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
631 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
632 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
633 actorContext.setCommitIndex(lastAppliedIndex);
634 actorContext.setLastApplied(lastAppliedIndex);
636 leader = new Leader(actorContext);
638 // Initial heartbeat.
639 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
641 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
642 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
644 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
645 Collections.<ReplicatedLogEntry>emptyList(),
646 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
648 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
650 assertTrue(raftBehavior instanceof Leader);
652 // check if installsnapshot gets called with the correct values.
654 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
656 assertNotNull(installSnapshot.getData());
657 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
658 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
660 assertEquals(currentTerm, installSnapshot.getTerm());
664 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
665 logStart("testHandleInstallSnapshotReplyLastChunk");
667 MockRaftActorContext actorContext = createActorContextWithFollower();
669 final int commitIndex = 3;
670 final int snapshotIndex = 2;
671 final int snapshotTerm = 1;
672 final int currentTerm = 2;
674 actorContext.setCommitIndex(commitIndex);
676 leader = new Leader(actorContext);
678 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
679 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
681 // Ignore initial heartbeat.
682 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
684 Map<String, String> leadersSnapshot = new HashMap<>();
685 leadersSnapshot.put("1", "A");
686 leadersSnapshot.put("2", "B");
687 leadersSnapshot.put("3", "C");
689 // set the snapshot variables in replicatedlog
691 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
695 ByteString bs = toByteString(leadersSnapshot);
696 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
697 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
698 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
699 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
700 while(!fts.isLastChunk(fts.getChunkIndex())) {
702 fts.incrementChunkIndex();
706 actorContext.getReplicatedLog().removeFrom(0);
708 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
709 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
711 assertTrue(raftBehavior instanceof Leader);
713 assertEquals(0, leader.followerSnapshotSize());
714 assertEquals(1, leader.followerLogSize());
715 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
717 assertEquals(commitIndex, fli.getMatchIndex());
718 assertEquals(commitIndex + 1, fli.getNextIndex());
722 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
723 logStart("testSendSnapshotfromInstallSnapshotReply");
725 MockRaftActorContext actorContext = createActorContextWithFollower();
727 final int commitIndex = 3;
728 final int snapshotIndex = 2;
729 final int snapshotTerm = 1;
730 final int currentTerm = 2;
732 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
734 public int getSnapshotChunkSize() {
738 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
739 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
741 actorContext.setConfigParams(configParams);
742 actorContext.setCommitIndex(commitIndex);
744 leader = new Leader(actorContext);
746 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
747 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
749 Map<String, String> leadersSnapshot = new HashMap<>();
750 leadersSnapshot.put("1", "A");
751 leadersSnapshot.put("2", "B");
752 leadersSnapshot.put("3", "C");
754 // set the snapshot variables in replicatedlog
755 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
756 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
757 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
759 ByteString bs = toByteString(leadersSnapshot);
760 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
761 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
762 leader.setSnapshot(snapshot);
764 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
766 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
768 assertEquals(1, installSnapshot.getChunkIndex());
769 assertEquals(3, installSnapshot.getTotalChunks());
771 followerActor.underlyingActor().clear();
772 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
773 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
775 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
777 assertEquals(2, installSnapshot.getChunkIndex());
778 assertEquals(3, installSnapshot.getTotalChunks());
780 followerActor.underlyingActor().clear();
781 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
782 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
784 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
786 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
787 followerActor.underlyingActor().clear();
788 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
789 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
791 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
793 Assert.assertNull(installSnapshot);
798 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
799 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
801 MockRaftActorContext actorContext = createActorContextWithFollower();
803 final int commitIndex = 3;
804 final int snapshotIndex = 2;
805 final int snapshotTerm = 1;
806 final int currentTerm = 2;
808 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
810 public int getSnapshotChunkSize() {
815 actorContext.setCommitIndex(commitIndex);
817 leader = new Leader(actorContext);
819 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
820 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
822 Map<String, String> leadersSnapshot = new HashMap<>();
823 leadersSnapshot.put("1", "A");
824 leadersSnapshot.put("2", "B");
825 leadersSnapshot.put("3", "C");
827 // set the snapshot variables in replicatedlog
828 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
829 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
830 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
832 ByteString bs = toByteString(leadersSnapshot);
833 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
834 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
835 leader.setSnapshot(snapshot);
837 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
838 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
840 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
842 assertEquals(1, installSnapshot.getChunkIndex());
843 assertEquals(3, installSnapshot.getTotalChunks());
845 followerActor.underlyingActor().clear();
847 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
848 FOLLOWER_ID, -1, false));
850 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
851 TimeUnit.MILLISECONDS);
853 leader.handleMessage(leaderActor, new SendHeartBeat());
855 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
857 assertEquals(1, installSnapshot.getChunkIndex());
858 assertEquals(3, installSnapshot.getTotalChunks());
862 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
863 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
865 MockRaftActorContext actorContext = createActorContextWithFollower();
867 final int commitIndex = 3;
868 final int snapshotIndex = 2;
869 final int snapshotTerm = 1;
870 final int currentTerm = 2;
872 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
874 public int getSnapshotChunkSize() {
879 actorContext.setCommitIndex(commitIndex);
881 leader = new Leader(actorContext);
883 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
884 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
886 Map<String, String> leadersSnapshot = new HashMap<>();
887 leadersSnapshot.put("1", "A");
888 leadersSnapshot.put("2", "B");
889 leadersSnapshot.put("3", "C");
891 // set the snapshot variables in replicatedlog
892 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
893 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
894 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
896 ByteString bs = toByteString(leadersSnapshot);
897 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
898 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
899 leader.setSnapshot(snapshot);
901 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
903 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
905 assertEquals(1, installSnapshot.getChunkIndex());
906 assertEquals(3, installSnapshot.getTotalChunks());
907 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
909 int hashCode = installSnapshot.getData().hashCode();
911 followerActor.underlyingActor().clear();
913 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
914 FOLLOWER_ID, 1, true));
916 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
918 assertEquals(2, installSnapshot.getChunkIndex());
919 assertEquals(3, installSnapshot.getTotalChunks());
920 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
924 public void testFollowerToSnapshotLogic() {
925 logStart("testFollowerToSnapshotLogic");
927 MockRaftActorContext actorContext = createActorContext();
929 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
931 public int getSnapshotChunkSize() {
936 leader = new Leader(actorContext);
938 Map<String, String> leadersSnapshot = new HashMap<>();
939 leadersSnapshot.put("1", "A");
940 leadersSnapshot.put("2", "B");
941 leadersSnapshot.put("3", "C");
943 ByteString bs = toByteString(leadersSnapshot);
944 byte[] barray = bs.toByteArray();
946 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
947 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
949 assertEquals(bs.size(), barray.length);
952 for (int i=0; i < barray.length; i = i + 50) {
956 if (i + 50 > barray.length) {
960 ByteString chunk = fts.getNextChunk();
961 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
962 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
964 fts.markSendStatus(true);
965 if (!fts.isLastChunk(chunkIndex)) {
966 fts.incrementChunkIndex();
970 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
973 @Override protected RaftActorBehavior createBehavior(
974 RaftActorContext actorContext) {
975 return new Leader(actorContext);
979 protected MockRaftActorContext createActorContext() {
980 return createActorContext(leaderActor);
984 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
985 return createActorContext(LEADER_ID, actorRef);
988 private MockRaftActorContext createActorContextWithFollower() {
989 MockRaftActorContext actorContext = createActorContext();
990 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
991 followerActor.path().toString()).build());
995 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
996 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
997 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
998 configParams.setElectionTimeoutFactor(100000);
999 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1000 context.setConfigParams(configParams);
1001 context.setPayloadVersion(payloadVersion);
1005 private MockRaftActorContext createFollowerActorContextWithLeader() {
1006 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1007 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1008 followerConfig.setElectionTimeoutFactor(10000);
1009 followerActorContext.setConfigParams(followerConfig);
1010 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1011 return followerActorContext;
1015 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1016 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1018 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1020 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1022 Follower follower = new Follower(followerActorContext);
1023 followerActor.underlyingActor().setBehavior(follower);
1025 Map<String, String> peerAddresses = new HashMap<>();
1026 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1028 leaderActorContext.setPeerAddresses(peerAddresses);
1030 leaderActorContext.getReplicatedLog().removeFrom(0);
1033 leaderActorContext.setReplicatedLog(
1034 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1036 leaderActorContext.setCommitIndex(1);
1038 followerActorContext.getReplicatedLog().removeFrom(0);
1040 // follower too has the exact same log entries and has the same commit index
1041 followerActorContext.setReplicatedLog(
1042 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1044 followerActorContext.setCommitIndex(1);
1046 leader = new Leader(leaderActorContext);
1048 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1050 assertEquals(1, appendEntries.getLeaderCommit());
1051 assertEquals(0, appendEntries.getEntries().size());
1052 assertEquals(0, appendEntries.getPrevLogIndex());
1054 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1055 leaderActor, AppendEntriesReply.class);
1057 assertEquals(2, appendEntriesReply.getLogLastIndex());
1058 assertEquals(1, appendEntriesReply.getLogLastTerm());
1060 // follower returns its next index
1061 assertEquals(2, appendEntriesReply.getLogLastIndex());
1062 assertEquals(1, appendEntriesReply.getLogLastTerm());
1068 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1069 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1071 MockRaftActorContext leaderActorContext = createActorContext();
1073 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1074 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1076 Follower follower = new Follower(followerActorContext);
1077 followerActor.underlyingActor().setBehavior(follower);
1079 Map<String, String> leaderPeerAddresses = new HashMap<>();
1080 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1082 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1084 leaderActorContext.getReplicatedLog().removeFrom(0);
1086 leaderActorContext.setReplicatedLog(
1087 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1089 leaderActorContext.setCommitIndex(1);
1091 followerActorContext.getReplicatedLog().removeFrom(0);
1093 followerActorContext.setReplicatedLog(
1094 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1096 // follower has the same log entries but its commit index > leaders commit index
1097 followerActorContext.setCommitIndex(2);
1099 leader = new Leader(leaderActorContext);
1101 // Initial heartbeat
1102 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1104 assertEquals(1, appendEntries.getLeaderCommit());
1105 assertEquals(0, appendEntries.getEntries().size());
1106 assertEquals(0, appendEntries.getPrevLogIndex());
1108 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1109 leaderActor, AppendEntriesReply.class);
1111 assertEquals(2, appendEntriesReply.getLogLastIndex());
1112 assertEquals(1, appendEntriesReply.getLogLastTerm());
1114 leaderActor.underlyingActor().setBehavior(follower);
1115 leader.handleMessage(followerActor, appendEntriesReply);
1117 leaderActor.underlyingActor().clear();
1118 followerActor.underlyingActor().clear();
1120 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1121 TimeUnit.MILLISECONDS);
1123 leader.handleMessage(leaderActor, new SendHeartBeat());
1125 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1127 assertEquals(2, appendEntries.getLeaderCommit());
1128 assertEquals(0, appendEntries.getEntries().size());
1129 assertEquals(2, appendEntries.getPrevLogIndex());
1131 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1133 assertEquals(2, appendEntriesReply.getLogLastIndex());
1134 assertEquals(1, appendEntriesReply.getLogLastTerm());
1136 assertEquals(2, followerActorContext.getCommitIndex());
1142 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1143 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1145 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1146 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1147 new FiniteDuration(1000, TimeUnit.SECONDS));
1149 leaderActorContext.setReplicatedLog(
1150 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1151 long leaderCommitIndex = 2;
1152 leaderActorContext.setCommitIndex(leaderCommitIndex);
1153 leaderActorContext.setLastApplied(leaderCommitIndex);
1155 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1156 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1158 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1160 followerActorContext.setReplicatedLog(
1161 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1162 followerActorContext.setCommitIndex(0);
1163 followerActorContext.setLastApplied(0);
1165 Follower follower = new Follower(followerActorContext);
1166 followerActor.underlyingActor().setBehavior(follower);
1168 leader = new Leader(leaderActorContext);
1170 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1171 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1173 MessageCollectorActor.clearMessages(followerActor);
1174 MessageCollectorActor.clearMessages(leaderActor);
1176 // Verify initial AppendEntries sent with the leader's current commit index.
1177 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1178 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1179 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1181 leaderActor.underlyingActor().setBehavior(leader);
1183 leader.handleMessage(followerActor, appendEntriesReply);
1185 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1186 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1188 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1189 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1190 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1192 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1193 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1194 appendEntries.getEntries().get(0).getData());
1195 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1196 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1197 appendEntries.getEntries().get(1).getData());
1199 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1200 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1202 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1204 ApplyState applyState = applyStateList.get(0);
1205 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1206 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1207 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1208 applyState.getReplicatedLogEntry().getData());
1210 applyState = applyStateList.get(1);
1211 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1212 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1213 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1214 applyState.getReplicatedLogEntry().getData());
1216 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1217 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1221 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1222 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1224 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1225 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1226 new FiniteDuration(1000, TimeUnit.SECONDS));
1228 leaderActorContext.setReplicatedLog(
1229 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1230 long leaderCommitIndex = 1;
1231 leaderActorContext.setCommitIndex(leaderCommitIndex);
1232 leaderActorContext.setLastApplied(leaderCommitIndex);
1234 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1235 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1237 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1239 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1240 followerActorContext.setCommitIndex(-1);
1241 followerActorContext.setLastApplied(-1);
1243 Follower follower = new Follower(followerActorContext);
1244 followerActor.underlyingActor().setBehavior(follower);
1246 leader = new Leader(leaderActorContext);
1248 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1249 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1251 MessageCollectorActor.clearMessages(followerActor);
1252 MessageCollectorActor.clearMessages(leaderActor);
1254 // Verify initial AppendEntries sent with the leader's current commit index.
1255 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1256 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1257 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1259 leaderActor.underlyingActor().setBehavior(leader);
1261 leader.handleMessage(followerActor, appendEntriesReply);
1263 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1264 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1266 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1267 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1268 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1270 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1271 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1272 appendEntries.getEntries().get(0).getData());
1273 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1274 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1275 appendEntries.getEntries().get(1).getData());
1277 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1278 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1280 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1282 ApplyState applyState = applyStateList.get(0);
1283 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1284 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1285 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1286 applyState.getReplicatedLogEntry().getData());
1288 applyState = applyStateList.get(1);
1289 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1290 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1291 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1292 applyState.getReplicatedLogEntry().getData());
1294 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1295 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1299 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1300 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1302 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1303 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1304 new FiniteDuration(1000, TimeUnit.SECONDS));
1306 leaderActorContext.setReplicatedLog(
1307 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1308 long leaderCommitIndex = 1;
1309 leaderActorContext.setCommitIndex(leaderCommitIndex);
1310 leaderActorContext.setLastApplied(leaderCommitIndex);
1312 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1313 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1315 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1317 followerActorContext.setReplicatedLog(
1318 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1319 followerActorContext.setCommitIndex(-1);
1320 followerActorContext.setLastApplied(-1);
1322 Follower follower = new Follower(followerActorContext);
1323 followerActor.underlyingActor().setBehavior(follower);
1325 leader = new Leader(leaderActorContext);
1327 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1330 MessageCollectorActor.clearMessages(followerActor);
1331 MessageCollectorActor.clearMessages(leaderActor);
1333 // Verify initial AppendEntries sent with the leader's current commit index.
1334 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1335 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1336 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1338 leaderActor.underlyingActor().setBehavior(leader);
1340 leader.handleMessage(followerActor, appendEntriesReply);
1342 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1343 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1345 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1346 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1347 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1349 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1350 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1351 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1352 appendEntries.getEntries().get(0).getData());
1353 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1354 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1355 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1356 appendEntries.getEntries().get(1).getData());
1358 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1359 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1361 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1363 ApplyState applyState = applyStateList.get(0);
1364 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1365 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1366 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1367 applyState.getReplicatedLogEntry().getData());
1369 applyState = applyStateList.get(1);
1370 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1371 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1372 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1373 applyState.getReplicatedLogEntry().getData());
1375 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1376 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1377 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1381 public void testHandleAppendEntriesReplySuccess() throws Exception {
1382 logStart("testHandleAppendEntriesReplySuccess");
1384 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1386 leaderActorContext.setReplicatedLog(
1387 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1389 leaderActorContext.setCommitIndex(1);
1390 leaderActorContext.setLastApplied(1);
1391 leaderActorContext.getTermInformation().update(1, "leader");
1393 leader = new Leader(leaderActorContext);
1395 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1397 short payloadVersion = 5;
1398 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1400 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1402 assertEquals(RaftState.Leader, raftActorBehavior.state());
1404 assertEquals(2, leaderActorContext.getCommitIndex());
1406 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1407 leaderActor, ApplyJournalEntries.class);
1409 assertEquals(2, leaderActorContext.getLastApplied());
1411 assertEquals(2, applyJournalEntries.getToIndex());
1413 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1416 assertEquals(1,applyStateList.size());
1418 ApplyState applyState = applyStateList.get(0);
1420 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1422 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1423 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1427 public void testHandleAppendEntriesReplyUnknownFollower(){
1428 logStart("testHandleAppendEntriesReplyUnknownFollower");
1430 MockRaftActorContext leaderActorContext = createActorContext();
1432 leader = new Leader(leaderActorContext);
1434 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1436 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1438 assertEquals(RaftState.Leader, raftActorBehavior.state());
1442 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1443 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1445 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1446 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1447 new FiniteDuration(1000, TimeUnit.SECONDS));
1448 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1450 leaderActorContext.setReplicatedLog(
1451 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1452 long leaderCommitIndex = 3;
1453 leaderActorContext.setCommitIndex(leaderCommitIndex);
1454 leaderActorContext.setLastApplied(leaderCommitIndex);
1456 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1457 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1458 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1459 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1461 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1463 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1464 followerActorContext.setCommitIndex(-1);
1465 followerActorContext.setLastApplied(-1);
1467 Follower follower = new Follower(followerActorContext);
1468 followerActor.underlyingActor().setBehavior(follower);
1470 leader = new Leader(leaderActorContext);
1472 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1473 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1475 MessageCollectorActor.clearMessages(followerActor);
1476 MessageCollectorActor.clearMessages(leaderActor);
1478 // Verify initial AppendEntries sent with the leader's current commit index.
1479 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1480 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1481 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1483 leaderActor.underlyingActor().setBehavior(leader);
1485 leader.handleMessage(followerActor, appendEntriesReply);
1487 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1488 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1490 appendEntries = appendEntriesList.get(0);
1491 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1492 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1493 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1495 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1496 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1497 appendEntries.getEntries().get(0).getData());
1498 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1499 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1500 appendEntries.getEntries().get(1).getData());
1502 appendEntries = appendEntriesList.get(1);
1503 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1504 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1505 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1507 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1508 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1509 appendEntries.getEntries().get(0).getData());
1510 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1511 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1512 appendEntries.getEntries().get(1).getData());
1514 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1515 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1517 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1519 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1520 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1524 public void testHandleRequestVoteReply(){
1525 logStart("testHandleRequestVoteReply");
1527 MockRaftActorContext leaderActorContext = createActorContext();
1529 leader = new Leader(leaderActorContext);
1531 // Should be a no-op.
1532 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1533 new RequestVoteReply(1, true));
1535 assertEquals(RaftState.Leader, raftActorBehavior.state());
1537 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1539 assertEquals(RaftState.Leader, raftActorBehavior.state());
1543 public void testIsolatedLeaderCheckNoFollowers() {
1544 logStart("testIsolatedLeaderCheckNoFollowers");
1546 MockRaftActorContext leaderActorContext = createActorContext();
1548 leader = new Leader(leaderActorContext);
1549 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1550 Assert.assertTrue(behavior instanceof Leader);
1554 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1555 logStart("testIsolatedLeaderCheckTwoFollowers");
1557 new JavaTestKit(getSystem()) {{
1559 ActorRef followerActor1 = getTestActor();
1560 ActorRef followerActor2 = getTestActor();
1562 MockRaftActorContext leaderActorContext = createActorContext();
1564 Map<String, String> peerAddresses = new HashMap<>();
1565 peerAddresses.put("follower-1", followerActor1.path().toString());
1566 peerAddresses.put("follower-2", followerActor2.path().toString());
1568 leaderActorContext.setPeerAddresses(peerAddresses);
1570 leader = new Leader(leaderActorContext);
1572 leader.markFollowerActive("follower-1");
1573 leader.markFollowerActive("follower-2");
1574 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1575 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1576 behavior instanceof Leader);
1578 // kill 1 follower and verify if that got killed
1579 final JavaTestKit probe = new JavaTestKit(getSystem());
1580 probe.watch(followerActor1);
1581 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1582 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1583 assertEquals(termMsg1.getActor(), followerActor1);
1585 leader.markFollowerInActive("follower-1");
1586 leader.markFollowerActive("follower-2");
1587 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1588 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1589 behavior instanceof Leader);
1591 // kill 2nd follower and leader should change to Isolated leader
1592 followerActor2.tell(PoisonPill.getInstance(), null);
1593 probe.watch(followerActor2);
1594 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1595 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1596 assertEquals(termMsg2.getActor(), followerActor2);
1598 leader.markFollowerInActive("follower-2");
1599 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1600 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1601 behavior instanceof IsolatedLeader);
1606 public void testLaggingFollowerStarvation() throws Exception {
1607 logStart("testLaggingFollowerStarvation");
1608 new JavaTestKit(getSystem()) {{
1609 String leaderActorId = actorFactory.generateActorId("leader");
1610 String follower1ActorId = actorFactory.generateActorId("follower");
1611 String follower2ActorId = actorFactory.generateActorId("follower");
1613 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1614 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1615 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1616 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1618 MockRaftActorContext leaderActorContext =
1619 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1621 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1622 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1623 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1625 leaderActorContext.setConfigParams(configParams);
1627 leaderActorContext.setReplicatedLog(
1628 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1630 Map<String, String> peerAddresses = new HashMap<>();
1631 peerAddresses.put(follower1ActorId,
1632 follower1Actor.path().toString());
1633 peerAddresses.put(follower2ActorId,
1634 follower2Actor.path().toString());
1636 leaderActorContext.setPeerAddresses(peerAddresses);
1637 leaderActorContext.getTermInformation().update(1, leaderActorId);
1639 RaftActorBehavior leader = createBehavior(leaderActorContext);
1641 leaderActor.underlyingActor().setBehavior(leader);
1643 for(int i=1;i<6;i++) {
1644 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1645 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1646 assertTrue(newBehavior == leader);
1647 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1650 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1651 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1653 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1654 heartbeats.size() > 1);
1656 // Check if follower-2 got AppendEntries during this time and was not starved
1657 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1659 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1660 appendEntries.size() > 1);
1666 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1667 ActorRef actorRef, RaftRPC rpc) throws Exception {
1668 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1669 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1672 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1674 private final long electionTimeOutIntervalMillis;
1675 private final int snapshotChunkSize;
1677 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1679 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1680 this.snapshotChunkSize = snapshotChunkSize;
1684 public FiniteDuration getElectionTimeOutInterval() {
1685 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1689 public int getSnapshotChunkSize() {
1690 return snapshotChunkSize;