1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.Snapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
45 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
46 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractLeaderTest {
52 static final String FOLLOWER_ID = "follower";
53 public static final String LEADER_ID = "leader";
55 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
56 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
58 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
59 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
61 private Leader leader;
62 private final short payloadVersion = 5;
66 public void tearDown() throws Exception {
75 public void testHandleMessageForUnknownMessage() throws Exception {
76 logStart("testHandleMessageForUnknownMessage");
78 leader = new Leader(createActorContext());
80 // handle message should return the Leader state when it receives an
82 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
83 Assert.assertTrue(behavior instanceof Leader);
87 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
88 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
90 MockRaftActorContext actorContext = createActorContextWithFollower();
91 short payloadVersion = (short)5;
92 actorContext.setPayloadVersion(payloadVersion);
95 actorContext.getTermInformation().update(term, "");
97 leader = new Leader(actorContext);
99 // Leader should send an immediate heartbeat with no entries as follower is inactive.
100 long lastIndex = actorContext.getReplicatedLog().lastIndex();
101 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
102 assertEquals("getTerm", term, appendEntries.getTerm());
103 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
104 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
105 assertEquals("Entries size", 0, appendEntries.getEntries().size());
106 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
108 // The follower would normally reply - simulate that explicitly here.
109 leader.handleMessage(followerActor, new AppendEntriesReply(
110 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
111 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
113 followerActor.underlyingActor().clear();
115 // Sleep for the heartbeat interval so AppendEntries is sent.
116 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
117 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
119 leader.handleMessage(leaderActor, new SendHeartBeat());
121 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
122 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 1, appendEntries.getEntries().size());
125 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
126 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
127 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
131 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
132 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
133 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
135 actorContext.getReplicatedLog().append(newEntry);
136 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
140 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
141 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
143 MockRaftActorContext actorContext = createActorContextWithFollower();
146 actorContext.getTermInformation().update(term, "");
148 leader = new Leader(actorContext);
150 // Leader will send an immediate heartbeat - ignore it.
151 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 // The follower would normally reply - simulate that explicitly here.
154 long lastIndex = actorContext.getReplicatedLog().lastIndex();
155 leader.handleMessage(followerActor, new AppendEntriesReply(
156 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
157 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
159 followerActor.underlyingActor().clear();
161 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
163 // State should not change
164 assertTrue(raftBehavior instanceof Leader);
166 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
167 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
168 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
169 assertEquals("Entries size", 1, appendEntries.getEntries().size());
170 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
171 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
172 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
173 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
177 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
178 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
180 MockRaftActorContext actorContext = createActorContextWithFollower();
181 actorContext.setRaftPolicy(createRaftPolicy(true, true));
184 actorContext.getTermInformation().update(term, "");
186 leader = new Leader(actorContext);
188 // Leader will send an immediate heartbeat - ignore it.
189 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191 // The follower would normally reply - simulate that explicitly here.
192 long lastIndex = actorContext.getReplicatedLog().lastIndex();
193 leader.handleMessage(followerActor, new AppendEntriesReply(
194 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
195 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197 followerActor.underlyingActor().clear();
199 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
201 // State should not change
202 assertTrue(raftBehavior instanceof Leader);
204 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
205 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
206 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
207 assertEquals("Entries size", 1, appendEntries.getEntries().size());
208 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
209 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
210 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
211 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
215 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
216 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
218 MockRaftActorContext actorContext = createActorContextWithFollower();
219 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
221 public FiniteDuration getHeartBeatInterval() {
222 return FiniteDuration.apply(5, TimeUnit.SECONDS);
227 actorContext.getTermInformation().update(term, "");
229 leader = new Leader(actorContext);
231 // Leader will send an immediate heartbeat - ignore it.
232 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 // The follower would normally reply - simulate that explicitly here.
235 long lastIndex = actorContext.getReplicatedLog().lastIndex();
236 leader.handleMessage(followerActor, new AppendEntriesReply(
237 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
238 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
240 followerActor.underlyingActor().clear();
242 for(int i=0;i<5;i++) {
243 sendReplicate(actorContext, lastIndex+i+1);
246 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
247 // We expect only 1 message to be sent because of two reasons,
248 // - an append entries reply was not received
249 // - the heartbeat interval has not expired
250 // In this scenario if multiple messages are sent they would likely be duplicates
251 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
255 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
256 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
258 MockRaftActorContext actorContext = createActorContextWithFollower();
259 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
261 public FiniteDuration getHeartBeatInterval() {
262 return FiniteDuration.apply(5, TimeUnit.SECONDS);
267 actorContext.getTermInformation().update(term, "");
269 leader = new Leader(actorContext);
271 // Leader will send an immediate heartbeat - ignore it.
272 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
274 // The follower would normally reply - simulate that explicitly here.
275 long lastIndex = actorContext.getReplicatedLog().lastIndex();
276 leader.handleMessage(followerActor, new AppendEntriesReply(
277 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
278 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
280 followerActor.underlyingActor().clear();
282 for(int i=0;i<3;i++) {
283 sendReplicate(actorContext, lastIndex+i+1);
284 leader.handleMessage(followerActor, new AppendEntriesReply(
285 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
289 for(int i=3;i<5;i++) {
290 sendReplicate(actorContext, lastIndex + i + 1);
293 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
294 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
295 // get sent to the follower - but not the 5th
296 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
298 for(int i=0;i<4;i++) {
299 long expected = allMessages.get(i).getEntries().get(0).getIndex();
300 assertEquals(expected, i+2);
305 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
306 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
308 MockRaftActorContext actorContext = createActorContextWithFollower();
309 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
311 public FiniteDuration getHeartBeatInterval() {
312 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
317 actorContext.getTermInformation().update(term, "");
319 leader = new Leader(actorContext);
321 // Leader will send an immediate heartbeat - ignore it.
322 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
324 // The follower would normally reply - simulate that explicitly here.
325 long lastIndex = actorContext.getReplicatedLog().lastIndex();
326 leader.handleMessage(followerActor, new AppendEntriesReply(
327 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
328 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
330 followerActor.underlyingActor().clear();
332 sendReplicate(actorContext, lastIndex+1);
334 // Wait slightly longer than heartbeat duration
335 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
337 leader.handleMessage(leaderActor, new SendHeartBeat());
339 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
340 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
342 assertEquals(1, allMessages.get(0).getEntries().size());
343 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
344 assertEquals(1, allMessages.get(1).getEntries().size());
345 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
350 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
351 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
353 MockRaftActorContext actorContext = createActorContextWithFollower();
354 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
356 public FiniteDuration getHeartBeatInterval() {
357 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
362 actorContext.getTermInformation().update(term, "");
364 leader = new Leader(actorContext);
366 // Leader will send an immediate heartbeat - ignore it.
367 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
369 // The follower would normally reply - simulate that explicitly here.
370 long lastIndex = actorContext.getReplicatedLog().lastIndex();
371 leader.handleMessage(followerActor, new AppendEntriesReply(
372 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
375 followerActor.underlyingActor().clear();
377 for(int i=0;i<3;i++) {
378 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
379 leader.handleMessage(leaderActor, new SendHeartBeat());
382 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
383 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
387 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
388 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
390 MockRaftActorContext actorContext = createActorContextWithFollower();
391 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
393 public FiniteDuration getHeartBeatInterval() {
394 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
399 actorContext.getTermInformation().update(term, "");
401 leader = new Leader(actorContext);
403 // Leader will send an immediate heartbeat - ignore it.
404 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
406 // The follower would normally reply - simulate that explicitly here.
407 long lastIndex = actorContext.getReplicatedLog().lastIndex();
408 leader.handleMessage(followerActor, new AppendEntriesReply(
409 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
410 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
412 followerActor.underlyingActor().clear();
414 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
415 leader.handleMessage(leaderActor, new SendHeartBeat());
416 sendReplicate(actorContext, lastIndex+1);
418 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
419 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
421 assertEquals(0, allMessages.get(0).getEntries().size());
422 assertEquals(1, allMessages.get(1).getEntries().size());
427 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
428 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
430 MockRaftActorContext actorContext = createActorContext();
432 leader = new Leader(actorContext);
434 actorContext.setLastApplied(0);
436 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
437 long term = actorContext.getTermInformation().getCurrentTerm();
438 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
439 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
441 actorContext.getReplicatedLog().append(newEntry);
443 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
444 new Replicate(leaderActor, "state-id", newEntry));
446 // State should not change
447 assertTrue(raftBehavior instanceof Leader);
449 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
451 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
452 // one since lastApplied state is 0.
453 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
454 leaderActor, ApplyState.class);
455 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
457 for(int i = 0; i <= newLogIndex - 1; i++ ) {
458 ApplyState applyState = applyStateList.get(i);
459 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
460 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
463 ApplyState last = applyStateList.get((int) newLogIndex - 1);
464 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
465 assertEquals("getIdentifier", "state-id", last.getIdentifier());
469 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
470 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
472 MockRaftActorContext actorContext = createActorContextWithFollower();
474 Map<String, String> leadersSnapshot = new HashMap<>();
475 leadersSnapshot.put("1", "A");
476 leadersSnapshot.put("2", "B");
477 leadersSnapshot.put("3", "C");
480 actorContext.getReplicatedLog().removeFrom(0);
482 final int commitIndex = 3;
483 final int snapshotIndex = 2;
484 final int newEntryIndex = 4;
485 final int snapshotTerm = 1;
486 final int currentTerm = 2;
488 // set the snapshot variables in replicatedlog
489 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
490 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
491 actorContext.setCommitIndex(commitIndex);
492 //set follower timeout to 2 mins, helps during debugging
493 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
495 leader = new Leader(actorContext);
497 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
498 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
501 ReplicatedLogImplEntry entry =
502 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
503 new MockRaftActorContext.MockPayload("D"));
505 //update follower timestamp
506 leader.markFollowerActive(FOLLOWER_ID);
508 ByteString bs = toByteString(leadersSnapshot);
509 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
510 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
511 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
512 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
514 //send first chunk and no InstallSnapshotReply received yet
516 fts.incrementChunkIndex();
518 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
519 TimeUnit.MILLISECONDS);
521 leader.handleMessage(leaderActor, new SendHeartBeat());
523 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
525 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
527 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
529 //InstallSnapshotReply received
530 fts.markSendStatus(true);
532 leader.handleMessage(leaderActor, new SendHeartBeat());
534 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
536 assertEquals(commitIndex, is.getLastIncludedIndex());
540 public void testSendAppendEntriesSnapshotScenario() throws Exception {
541 logStart("testSendAppendEntriesSnapshotScenario");
543 MockRaftActorContext actorContext = createActorContextWithFollower();
545 Map<String, String> leadersSnapshot = new HashMap<>();
546 leadersSnapshot.put("1", "A");
547 leadersSnapshot.put("2", "B");
548 leadersSnapshot.put("3", "C");
551 actorContext.getReplicatedLog().removeFrom(0);
553 final int followersLastIndex = 2;
554 final int snapshotIndex = 3;
555 final int newEntryIndex = 4;
556 final int snapshotTerm = 1;
557 final int currentTerm = 2;
559 // set the snapshot variables in replicatedlog
560 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
561 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
562 actorContext.setCommitIndex(followersLastIndex);
564 leader = new Leader(actorContext);
566 // Leader will send an immediate heartbeat - ignore it.
567 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
570 ReplicatedLogImplEntry entry =
571 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
572 new MockRaftActorContext.MockPayload("D"));
574 actorContext.getReplicatedLog().append(entry);
576 //update follower timestamp
577 leader.markFollowerActive(FOLLOWER_ID);
579 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
580 RaftActorBehavior raftBehavior = leader.handleMessage(
581 leaderActor, new Replicate(null, "state-id", entry));
583 assertTrue(raftBehavior instanceof Leader);
585 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
589 public void testInitiateInstallSnapshot() throws Exception {
590 logStart("testInitiateInstallSnapshot");
592 MockRaftActorContext actorContext = createActorContextWithFollower();
594 Map<String, String> leadersSnapshot = new HashMap<>();
595 leadersSnapshot.put("1", "A");
596 leadersSnapshot.put("2", "B");
597 leadersSnapshot.put("3", "C");
600 actorContext.getReplicatedLog().removeFrom(0);
602 final int followersLastIndex = 2;
603 final int snapshotIndex = 3;
604 final int newEntryIndex = 4;
605 final int snapshotTerm = 1;
606 final int currentTerm = 2;
608 // set the snapshot variables in replicatedlog
609 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
610 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
611 actorContext.setLastApplied(3);
612 actorContext.setCommitIndex(followersLastIndex);
614 leader = new Leader(actorContext);
616 // Leader will send an immediate heartbeat - ignore it.
617 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
619 // set the snapshot as absent and check if capture-snapshot is invoked.
620 leader.setSnapshot(null);
623 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
624 new MockRaftActorContext.MockPayload("D"));
626 actorContext.getReplicatedLog().append(entry);
628 //update follower timestamp
629 leader.markFollowerActive(FOLLOWER_ID);
631 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
633 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
635 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
637 assertTrue(cs.isInstallSnapshotInitiated());
638 assertEquals(3, cs.getLastAppliedIndex());
639 assertEquals(1, cs.getLastAppliedTerm());
640 assertEquals(4, cs.getLastIndex());
641 assertEquals(2, cs.getLastTerm());
643 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
644 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
646 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
650 public void testInstallSnapshot() throws Exception {
651 logStart("testInstallSnapshot");
653 MockRaftActorContext actorContext = createActorContextWithFollower();
655 Map<String, String> leadersSnapshot = new HashMap<>();
656 leadersSnapshot.put("1", "A");
657 leadersSnapshot.put("2", "B");
658 leadersSnapshot.put("3", "C");
661 actorContext.getReplicatedLog().removeFrom(0);
663 final int lastAppliedIndex = 3;
664 final int snapshotIndex = 2;
665 final int snapshotTerm = 1;
666 final int currentTerm = 2;
668 // set the snapshot variables in replicatedlog
669 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
670 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
671 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
672 actorContext.setCommitIndex(lastAppliedIndex);
673 actorContext.setLastApplied(lastAppliedIndex);
675 leader = new Leader(actorContext);
677 // Initial heartbeat.
678 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
680 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
681 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
683 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
684 Collections.<ReplicatedLogEntry>emptyList(),
685 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
687 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
689 assertTrue(raftBehavior instanceof Leader);
691 // check if installsnapshot gets called with the correct values.
693 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
695 assertNotNull(installSnapshot.getData());
696 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
697 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
699 assertEquals(currentTerm, installSnapshot.getTerm());
703 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
704 logStart("testHandleInstallSnapshotReplyLastChunk");
706 MockRaftActorContext actorContext = createActorContextWithFollower();
708 final int commitIndex = 3;
709 final int snapshotIndex = 2;
710 final int snapshotTerm = 1;
711 final int currentTerm = 2;
713 actorContext.setCommitIndex(commitIndex);
715 leader = new Leader(actorContext);
717 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
718 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
720 // Ignore initial heartbeat.
721 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
723 Map<String, String> leadersSnapshot = new HashMap<>();
724 leadersSnapshot.put("1", "A");
725 leadersSnapshot.put("2", "B");
726 leadersSnapshot.put("3", "C");
728 // set the snapshot variables in replicatedlog
730 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
734 ByteString bs = toByteString(leadersSnapshot);
735 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
736 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
737 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
738 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
739 while(!fts.isLastChunk(fts.getChunkIndex())) {
741 fts.incrementChunkIndex();
745 actorContext.getReplicatedLog().removeFrom(0);
747 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
748 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
750 assertTrue(raftBehavior instanceof Leader);
752 assertEquals(0, leader.followerSnapshotSize());
753 assertEquals(1, leader.followerLogSize());
754 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
756 assertEquals(commitIndex, fli.getMatchIndex());
757 assertEquals(commitIndex + 1, fli.getNextIndex());
761 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
762 logStart("testSendSnapshotfromInstallSnapshotReply");
764 MockRaftActorContext actorContext = createActorContextWithFollower();
766 final int commitIndex = 3;
767 final int snapshotIndex = 2;
768 final int snapshotTerm = 1;
769 final int currentTerm = 2;
771 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
773 public int getSnapshotChunkSize() {
777 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
778 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
780 actorContext.setConfigParams(configParams);
781 actorContext.setCommitIndex(commitIndex);
783 leader = new Leader(actorContext);
785 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
786 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
788 Map<String, String> leadersSnapshot = new HashMap<>();
789 leadersSnapshot.put("1", "A");
790 leadersSnapshot.put("2", "B");
791 leadersSnapshot.put("3", "C");
793 // set the snapshot variables in replicatedlog
794 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
795 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
796 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
798 ByteString bs = toByteString(leadersSnapshot);
799 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
800 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
801 leader.setSnapshot(snapshot);
803 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
805 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
807 assertEquals(1, installSnapshot.getChunkIndex());
808 assertEquals(3, installSnapshot.getTotalChunks());
810 followerActor.underlyingActor().clear();
811 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
812 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
814 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
816 assertEquals(2, installSnapshot.getChunkIndex());
817 assertEquals(3, installSnapshot.getTotalChunks());
819 followerActor.underlyingActor().clear();
820 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
821 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
823 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
826 followerActor.underlyingActor().clear();
827 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
828 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
830 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
832 Assert.assertNull(installSnapshot);
837 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
838 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
840 MockRaftActorContext actorContext = createActorContextWithFollower();
842 final int commitIndex = 3;
843 final int snapshotIndex = 2;
844 final int snapshotTerm = 1;
845 final int currentTerm = 2;
847 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
849 public int getSnapshotChunkSize() {
854 actorContext.setCommitIndex(commitIndex);
856 leader = new Leader(actorContext);
858 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
859 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
861 Map<String, String> leadersSnapshot = new HashMap<>();
862 leadersSnapshot.put("1", "A");
863 leadersSnapshot.put("2", "B");
864 leadersSnapshot.put("3", "C");
866 // set the snapshot variables in replicatedlog
867 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
868 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
869 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
871 ByteString bs = toByteString(leadersSnapshot);
872 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
873 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
874 leader.setSnapshot(snapshot);
876 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
877 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
879 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
881 assertEquals(1, installSnapshot.getChunkIndex());
882 assertEquals(3, installSnapshot.getTotalChunks());
884 followerActor.underlyingActor().clear();
886 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
887 FOLLOWER_ID, -1, false));
889 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
890 TimeUnit.MILLISECONDS);
892 leader.handleMessage(leaderActor, new SendHeartBeat());
894 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
896 assertEquals(1, installSnapshot.getChunkIndex());
897 assertEquals(3, installSnapshot.getTotalChunks());
901 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
902 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
904 MockRaftActorContext actorContext = createActorContextWithFollower();
906 final int commitIndex = 3;
907 final int snapshotIndex = 2;
908 final int snapshotTerm = 1;
909 final int currentTerm = 2;
911 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
913 public int getSnapshotChunkSize() {
918 actorContext.setCommitIndex(commitIndex);
920 leader = new Leader(actorContext);
922 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
923 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
925 Map<String, String> leadersSnapshot = new HashMap<>();
926 leadersSnapshot.put("1", "A");
927 leadersSnapshot.put("2", "B");
928 leadersSnapshot.put("3", "C");
930 // set the snapshot variables in replicatedlog
931 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
932 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
933 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
935 ByteString bs = toByteString(leadersSnapshot);
936 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
937 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
938 leader.setSnapshot(snapshot);
940 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
942 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
944 assertEquals(1, installSnapshot.getChunkIndex());
945 assertEquals(3, installSnapshot.getTotalChunks());
946 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
948 int hashCode = installSnapshot.getData().hashCode();
950 followerActor.underlyingActor().clear();
952 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
953 FOLLOWER_ID, 1, true));
955 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
957 assertEquals(2, installSnapshot.getChunkIndex());
958 assertEquals(3, installSnapshot.getTotalChunks());
959 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
963 public void testFollowerToSnapshotLogic() {
964 logStart("testFollowerToSnapshotLogic");
966 MockRaftActorContext actorContext = createActorContext();
968 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
970 public int getSnapshotChunkSize() {
975 leader = new Leader(actorContext);
977 Map<String, String> leadersSnapshot = new HashMap<>();
978 leadersSnapshot.put("1", "A");
979 leadersSnapshot.put("2", "B");
980 leadersSnapshot.put("3", "C");
982 ByteString bs = toByteString(leadersSnapshot);
983 byte[] barray = bs.toByteArray();
985 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
986 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
988 assertEquals(bs.size(), barray.length);
991 for (int i=0; i < barray.length; i = i + 50) {
995 if (i + 50 > barray.length) {
999 ByteString chunk = fts.getNextChunk();
1000 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1001 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1003 fts.markSendStatus(true);
1004 if (!fts.isLastChunk(chunkIndex)) {
1005 fts.incrementChunkIndex();
1009 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1012 @Override protected RaftActorBehavior createBehavior(
1013 RaftActorContext actorContext) {
1014 return new Leader(actorContext);
1018 protected MockRaftActorContext createActorContext() {
1019 return createActorContext(leaderActor);
1023 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1024 return createActorContext(LEADER_ID, actorRef);
1027 private MockRaftActorContext createActorContextWithFollower() {
1028 MockRaftActorContext actorContext = createActorContext();
1029 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1030 followerActor.path().toString()).build());
1031 return actorContext;
1034 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1035 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1036 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1037 configParams.setElectionTimeoutFactor(100000);
1038 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1039 context.setConfigParams(configParams);
1040 context.setPayloadVersion(payloadVersion);
1044 private MockRaftActorContext createFollowerActorContextWithLeader() {
1045 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1046 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1047 followerConfig.setElectionTimeoutFactor(10000);
1048 followerActorContext.setConfigParams(followerConfig);
1049 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1050 return followerActorContext;
1054 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1055 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1057 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1059 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1061 Follower follower = new Follower(followerActorContext);
1062 followerActor.underlyingActor().setBehavior(follower);
1064 Map<String, String> peerAddresses = new HashMap<>();
1065 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1067 leaderActorContext.setPeerAddresses(peerAddresses);
1069 leaderActorContext.getReplicatedLog().removeFrom(0);
1072 leaderActorContext.setReplicatedLog(
1073 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1075 leaderActorContext.setCommitIndex(1);
1077 followerActorContext.getReplicatedLog().removeFrom(0);
1079 // follower too has the exact same log entries and has the same commit index
1080 followerActorContext.setReplicatedLog(
1081 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1083 followerActorContext.setCommitIndex(1);
1085 leader = new Leader(leaderActorContext);
1087 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1089 assertEquals(1, appendEntries.getLeaderCommit());
1090 assertEquals(0, appendEntries.getEntries().size());
1091 assertEquals(0, appendEntries.getPrevLogIndex());
1093 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1094 leaderActor, AppendEntriesReply.class);
1096 assertEquals(2, appendEntriesReply.getLogLastIndex());
1097 assertEquals(1, appendEntriesReply.getLogLastTerm());
1099 // follower returns its next index
1100 assertEquals(2, appendEntriesReply.getLogLastIndex());
1101 assertEquals(1, appendEntriesReply.getLogLastTerm());
1107 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1108 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1110 MockRaftActorContext leaderActorContext = createActorContext();
1112 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1113 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1115 Follower follower = new Follower(followerActorContext);
1116 followerActor.underlyingActor().setBehavior(follower);
1118 Map<String, String> leaderPeerAddresses = new HashMap<>();
1119 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1121 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1123 leaderActorContext.getReplicatedLog().removeFrom(0);
1125 leaderActorContext.setReplicatedLog(
1126 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1128 leaderActorContext.setCommitIndex(1);
1130 followerActorContext.getReplicatedLog().removeFrom(0);
1132 followerActorContext.setReplicatedLog(
1133 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1135 // follower has the same log entries but its commit index > leaders commit index
1136 followerActorContext.setCommitIndex(2);
1138 leader = new Leader(leaderActorContext);
1140 // Initial heartbeat
1141 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1143 assertEquals(1, appendEntries.getLeaderCommit());
1144 assertEquals(0, appendEntries.getEntries().size());
1145 assertEquals(0, appendEntries.getPrevLogIndex());
1147 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1148 leaderActor, AppendEntriesReply.class);
1150 assertEquals(2, appendEntriesReply.getLogLastIndex());
1151 assertEquals(1, appendEntriesReply.getLogLastTerm());
1153 leaderActor.underlyingActor().setBehavior(follower);
1154 leader.handleMessage(followerActor, appendEntriesReply);
1156 leaderActor.underlyingActor().clear();
1157 followerActor.underlyingActor().clear();
1159 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1160 TimeUnit.MILLISECONDS);
1162 leader.handleMessage(leaderActor, new SendHeartBeat());
1164 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1166 assertEquals(2, appendEntries.getLeaderCommit());
1167 assertEquals(0, appendEntries.getEntries().size());
1168 assertEquals(2, appendEntries.getPrevLogIndex());
1170 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1172 assertEquals(2, appendEntriesReply.getLogLastIndex());
1173 assertEquals(1, appendEntriesReply.getLogLastTerm());
1175 assertEquals(2, followerActorContext.getCommitIndex());
1181 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1182 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1184 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1185 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1186 new FiniteDuration(1000, TimeUnit.SECONDS));
1188 leaderActorContext.setReplicatedLog(
1189 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1190 long leaderCommitIndex = 2;
1191 leaderActorContext.setCommitIndex(leaderCommitIndex);
1192 leaderActorContext.setLastApplied(leaderCommitIndex);
1194 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1195 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1197 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1199 followerActorContext.setReplicatedLog(
1200 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1201 followerActorContext.setCommitIndex(0);
1202 followerActorContext.setLastApplied(0);
1204 Follower follower = new Follower(followerActorContext);
1205 followerActor.underlyingActor().setBehavior(follower);
1207 leader = new Leader(leaderActorContext);
1209 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1210 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1212 MessageCollectorActor.clearMessages(followerActor);
1213 MessageCollectorActor.clearMessages(leaderActor);
1215 // Verify initial AppendEntries sent with the leader's current commit index.
1216 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1217 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1218 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1220 leaderActor.underlyingActor().setBehavior(leader);
1222 leader.handleMessage(followerActor, appendEntriesReply);
1224 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1225 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1227 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1228 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1229 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1231 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1232 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1233 appendEntries.getEntries().get(0).getData());
1234 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1235 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1236 appendEntries.getEntries().get(1).getData());
1238 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1239 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1241 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1243 ApplyState applyState = applyStateList.get(0);
1244 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1245 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1246 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1247 applyState.getReplicatedLogEntry().getData());
1249 applyState = applyStateList.get(1);
1250 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1251 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1252 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1253 applyState.getReplicatedLogEntry().getData());
1255 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1256 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1260 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1261 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1263 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1264 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1265 new FiniteDuration(1000, TimeUnit.SECONDS));
1267 leaderActorContext.setReplicatedLog(
1268 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1269 long leaderCommitIndex = 1;
1270 leaderActorContext.setCommitIndex(leaderCommitIndex);
1271 leaderActorContext.setLastApplied(leaderCommitIndex);
1273 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1274 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1276 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1278 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1279 followerActorContext.setCommitIndex(-1);
1280 followerActorContext.setLastApplied(-1);
1282 Follower follower = new Follower(followerActorContext);
1283 followerActor.underlyingActor().setBehavior(follower);
1285 leader = new Leader(leaderActorContext);
1287 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1288 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1290 MessageCollectorActor.clearMessages(followerActor);
1291 MessageCollectorActor.clearMessages(leaderActor);
1293 // Verify initial AppendEntries sent with the leader's current commit index.
1294 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1295 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1296 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1298 leaderActor.underlyingActor().setBehavior(leader);
1300 leader.handleMessage(followerActor, appendEntriesReply);
1302 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1303 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1305 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1306 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1307 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1309 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1310 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1311 appendEntries.getEntries().get(0).getData());
1312 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1313 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1314 appendEntries.getEntries().get(1).getData());
1316 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1317 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1319 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1321 ApplyState applyState = applyStateList.get(0);
1322 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1323 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1324 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1325 applyState.getReplicatedLogEntry().getData());
1327 applyState = applyStateList.get(1);
1328 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1329 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1330 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1331 applyState.getReplicatedLogEntry().getData());
1333 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1334 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1338 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1339 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1341 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1342 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1343 new FiniteDuration(1000, TimeUnit.SECONDS));
1345 leaderActorContext.setReplicatedLog(
1346 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1347 long leaderCommitIndex = 1;
1348 leaderActorContext.setCommitIndex(leaderCommitIndex);
1349 leaderActorContext.setLastApplied(leaderCommitIndex);
1351 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1352 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1354 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1356 followerActorContext.setReplicatedLog(
1357 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1358 followerActorContext.setCommitIndex(-1);
1359 followerActorContext.setLastApplied(-1);
1361 Follower follower = new Follower(followerActorContext);
1362 followerActor.underlyingActor().setBehavior(follower);
1364 leader = new Leader(leaderActorContext);
1366 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1367 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1369 MessageCollectorActor.clearMessages(followerActor);
1370 MessageCollectorActor.clearMessages(leaderActor);
1372 // Verify initial AppendEntries sent with the leader's current commit index.
1373 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1374 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1375 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1377 leaderActor.underlyingActor().setBehavior(leader);
1379 leader.handleMessage(followerActor, appendEntriesReply);
1381 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1382 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1384 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1385 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1386 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1388 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1389 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1390 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1391 appendEntries.getEntries().get(0).getData());
1392 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1393 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1394 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1395 appendEntries.getEntries().get(1).getData());
1397 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1398 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1400 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1402 ApplyState applyState = applyStateList.get(0);
1403 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1404 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1405 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1406 applyState.getReplicatedLogEntry().getData());
1408 applyState = applyStateList.get(1);
1409 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1410 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1411 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1412 applyState.getReplicatedLogEntry().getData());
1414 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1415 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1416 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1420 public void testHandleAppendEntriesReplySuccess() throws Exception {
1421 logStart("testHandleAppendEntriesReplySuccess");
1423 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1425 leaderActorContext.setReplicatedLog(
1426 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1428 leaderActorContext.setCommitIndex(1);
1429 leaderActorContext.setLastApplied(1);
1430 leaderActorContext.getTermInformation().update(1, "leader");
1432 leader = new Leader(leaderActorContext);
1434 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1436 short payloadVersion = 5;
1437 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1439 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1441 assertEquals(RaftState.Leader, raftActorBehavior.state());
1443 assertEquals(2, leaderActorContext.getCommitIndex());
1445 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1446 leaderActor, ApplyJournalEntries.class);
1448 assertEquals(2, leaderActorContext.getLastApplied());
1450 assertEquals(2, applyJournalEntries.getToIndex());
1452 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1455 assertEquals(1,applyStateList.size());
1457 ApplyState applyState = applyStateList.get(0);
1459 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1461 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1462 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1466 public void testHandleAppendEntriesReplyUnknownFollower(){
1467 logStart("testHandleAppendEntriesReplyUnknownFollower");
1469 MockRaftActorContext leaderActorContext = createActorContext();
1471 leader = new Leader(leaderActorContext);
1473 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1475 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1477 assertEquals(RaftState.Leader, raftActorBehavior.state());
1481 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1482 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1484 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1485 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1486 new FiniteDuration(1000, TimeUnit.SECONDS));
1487 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1489 leaderActorContext.setReplicatedLog(
1490 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1491 long leaderCommitIndex = 3;
1492 leaderActorContext.setCommitIndex(leaderCommitIndex);
1493 leaderActorContext.setLastApplied(leaderCommitIndex);
1495 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1496 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1497 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1498 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1500 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1502 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1503 followerActorContext.setCommitIndex(-1);
1504 followerActorContext.setLastApplied(-1);
1506 Follower follower = new Follower(followerActorContext);
1507 followerActor.underlyingActor().setBehavior(follower);
1509 leader = new Leader(leaderActorContext);
1511 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1512 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1514 MessageCollectorActor.clearMessages(followerActor);
1515 MessageCollectorActor.clearMessages(leaderActor);
1517 // Verify initial AppendEntries sent with the leader's current commit index.
1518 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1519 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1520 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1522 leaderActor.underlyingActor().setBehavior(leader);
1524 leader.handleMessage(followerActor, appendEntriesReply);
1526 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1527 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1529 appendEntries = appendEntriesList.get(0);
1530 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1531 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1532 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1534 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1535 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1536 appendEntries.getEntries().get(0).getData());
1537 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1538 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1539 appendEntries.getEntries().get(1).getData());
1541 appendEntries = appendEntriesList.get(1);
1542 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1543 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1544 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1546 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1547 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1548 appendEntries.getEntries().get(0).getData());
1549 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1550 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1551 appendEntries.getEntries().get(1).getData());
1553 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1554 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1556 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1558 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1559 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1563 public void testHandleRequestVoteReply(){
1564 logStart("testHandleRequestVoteReply");
1566 MockRaftActorContext leaderActorContext = createActorContext();
1568 leader = new Leader(leaderActorContext);
1570 // Should be a no-op.
1571 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1572 new RequestVoteReply(1, true));
1574 assertEquals(RaftState.Leader, raftActorBehavior.state());
1576 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1578 assertEquals(RaftState.Leader, raftActorBehavior.state());
1582 public void testIsolatedLeaderCheckNoFollowers() {
1583 logStart("testIsolatedLeaderCheckNoFollowers");
1585 MockRaftActorContext leaderActorContext = createActorContext();
1587 leader = new Leader(leaderActorContext);
1588 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1589 Assert.assertTrue(behavior instanceof Leader);
1593 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1594 logStart("testIsolatedLeaderCheckTwoFollowers");
1596 new JavaTestKit(getSystem()) {{
1598 ActorRef followerActor1 = getTestActor();
1599 ActorRef followerActor2 = getTestActor();
1601 MockRaftActorContext leaderActorContext = createActorContext();
1603 Map<String, String> peerAddresses = new HashMap<>();
1604 peerAddresses.put("follower-1", followerActor1.path().toString());
1605 peerAddresses.put("follower-2", followerActor2.path().toString());
1607 leaderActorContext.setPeerAddresses(peerAddresses);
1609 leader = new Leader(leaderActorContext);
1611 leader.markFollowerActive("follower-1");
1612 leader.markFollowerActive("follower-2");
1613 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1614 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1615 behavior instanceof Leader);
1617 // kill 1 follower and verify if that got killed
1618 final JavaTestKit probe = new JavaTestKit(getSystem());
1619 probe.watch(followerActor1);
1620 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1621 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1622 assertEquals(termMsg1.getActor(), followerActor1);
1624 leader.markFollowerInActive("follower-1");
1625 leader.markFollowerActive("follower-2");
1626 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1627 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1628 behavior instanceof Leader);
1630 // kill 2nd follower and leader should change to Isolated leader
1631 followerActor2.tell(PoisonPill.getInstance(), null);
1632 probe.watch(followerActor2);
1633 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1634 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1635 assertEquals(termMsg2.getActor(), followerActor2);
1637 leader.markFollowerInActive("follower-2");
1638 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1639 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1640 behavior instanceof IsolatedLeader);
1645 public void testLaggingFollowerStarvation() throws Exception {
1646 logStart("testLaggingFollowerStarvation");
1647 new JavaTestKit(getSystem()) {{
1648 String leaderActorId = actorFactory.generateActorId("leader");
1649 String follower1ActorId = actorFactory.generateActorId("follower");
1650 String follower2ActorId = actorFactory.generateActorId("follower");
1652 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1653 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1654 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1655 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1657 MockRaftActorContext leaderActorContext =
1658 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1660 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1661 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1662 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1664 leaderActorContext.setConfigParams(configParams);
1666 leaderActorContext.setReplicatedLog(
1667 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1669 Map<String, String> peerAddresses = new HashMap<>();
1670 peerAddresses.put(follower1ActorId,
1671 follower1Actor.path().toString());
1672 peerAddresses.put(follower2ActorId,
1673 follower2Actor.path().toString());
1675 leaderActorContext.setPeerAddresses(peerAddresses);
1676 leaderActorContext.getTermInformation().update(1, leaderActorId);
1678 RaftActorBehavior leader = createBehavior(leaderActorContext);
1680 leaderActor.underlyingActor().setBehavior(leader);
1682 for(int i=1;i<6;i++) {
1683 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1684 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1685 assertTrue(newBehavior == leader);
1686 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1689 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1690 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1692 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1693 heartbeats.size() > 1);
1695 // Check if follower-2 got AppendEntries during this time and was not starved
1696 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1698 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1699 appendEntries.size() > 1);
1705 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1706 ActorRef actorRef, RaftRPC rpc) throws Exception {
1707 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1708 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1711 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1713 private final long electionTimeOutIntervalMillis;
1714 private final int snapshotChunkSize;
1716 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1718 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1719 this.snapshotChunkSize = snapshotChunkSize;
1723 public FiniteDuration getElectionTimeOutInterval() {
1724 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1728 public int getSnapshotChunkSize() {
1729 return snapshotChunkSize;