2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import scala.concurrent.duration.FiniteDuration;
58 public class LeaderTest extends AbstractLeaderTest {
60 static final String FOLLOWER_ID = "follower";
61 public static final String LEADER_ID = "leader";
63 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
64 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
66 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
67 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
69 private Leader leader;
70 private final short payloadVersion = 5;
74 public void tearDown() throws Exception {
83 public void testHandleMessageForUnknownMessage() throws Exception {
84 logStart("testHandleMessageForUnknownMessage");
86 leader = new Leader(createActorContext());
88 // handle message should return the Leader state when it receives an
90 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
91 Assert.assertTrue(behavior instanceof Leader);
95 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
96 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
98 MockRaftActorContext actorContext = createActorContextWithFollower();
99 short payloadVersion = (short)5;
100 actorContext.setPayloadVersion(payloadVersion);
103 actorContext.getTermInformation().update(term, "");
105 leader = new Leader(actorContext);
107 // Leader should send an immediate heartbeat with no entries as follower is inactive.
108 long lastIndex = actorContext.getReplicatedLog().lastIndex();
109 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
110 assertEquals("getTerm", term, appendEntries.getTerm());
111 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
112 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
113 assertEquals("Entries size", 0, appendEntries.getEntries().size());
114 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
116 // The follower would normally reply - simulate that explicitly here.
117 leader.handleMessage(followerActor, new AppendEntriesReply(
118 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
119 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
121 followerActor.underlyingActor().clear();
123 // Sleep for the heartbeat interval so AppendEntries is sent.
124 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
125 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
127 leader.handleMessage(leaderActor, new SendHeartBeat());
129 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
130 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
131 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
132 assertEquals("Entries size", 1, appendEntries.getEntries().size());
133 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
134 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
135 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
140 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
141 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
143 actorContext.getReplicatedLog().append(newEntry);
144 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
148 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
149 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
151 MockRaftActorContext actorContext = createActorContextWithFollower();
154 actorContext.getTermInformation().update(term, "");
156 leader = new Leader(actorContext);
158 // Leader will send an immediate heartbeat - ignore it.
159 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
161 // The follower would normally reply - simulate that explicitly here.
162 long lastIndex = actorContext.getReplicatedLog().lastIndex();
163 leader.handleMessage(followerActor, new AppendEntriesReply(
164 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
165 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
167 followerActor.underlyingActor().clear();
169 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
171 // State should not change
172 assertTrue(raftBehavior instanceof Leader);
174 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
176 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
177 assertEquals("Entries size", 1, appendEntries.getEntries().size());
178 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
179 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
180 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
181 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
185 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
186 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
188 MockRaftActorContext actorContext = createActorContextWithFollower();
189 actorContext.setRaftPolicy(createRaftPolicy(true, true));
192 actorContext.getTermInformation().update(term, "");
194 leader = new Leader(actorContext);
196 // Leader will send an immediate heartbeat - ignore it.
197 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
199 // The follower would normally reply - simulate that explicitly here.
200 long lastIndex = actorContext.getReplicatedLog().lastIndex();
201 leader.handleMessage(followerActor, new AppendEntriesReply(
202 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
203 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
205 followerActor.underlyingActor().clear();
207 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
209 // State should not change
210 assertTrue(raftBehavior instanceof Leader);
212 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
213 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
214 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
215 assertEquals("Entries size", 1, appendEntries.getEntries().size());
216 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
217 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
218 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
219 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
223 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
224 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
226 MockRaftActorContext actorContext = createActorContextWithFollower();
227 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
229 public FiniteDuration getHeartBeatInterval() {
230 return FiniteDuration.apply(5, TimeUnit.SECONDS);
235 actorContext.getTermInformation().update(term, "");
237 leader = new Leader(actorContext);
239 // Leader will send an immediate heartbeat - ignore it.
240 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
242 // The follower would normally reply - simulate that explicitly here.
243 long lastIndex = actorContext.getReplicatedLog().lastIndex();
244 leader.handleMessage(followerActor, new AppendEntriesReply(
245 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
246 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
248 followerActor.underlyingActor().clear();
250 for(int i=0;i<5;i++) {
251 sendReplicate(actorContext, lastIndex+i+1);
254 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
255 // We expect only 1 message to be sent because of two reasons,
256 // - an append entries reply was not received
257 // - the heartbeat interval has not expired
258 // In this scenario if multiple messages are sent they would likely be duplicates
259 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
263 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
264 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
266 MockRaftActorContext actorContext = createActorContextWithFollower();
267 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
269 public FiniteDuration getHeartBeatInterval() {
270 return FiniteDuration.apply(5, TimeUnit.SECONDS);
275 actorContext.getTermInformation().update(term, "");
277 leader = new Leader(actorContext);
279 // Leader will send an immediate heartbeat - ignore it.
280 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 // The follower would normally reply - simulate that explicitly here.
283 long lastIndex = actorContext.getReplicatedLog().lastIndex();
284 leader.handleMessage(followerActor, new AppendEntriesReply(
285 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
286 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288 followerActor.underlyingActor().clear();
290 for(int i=0;i<3;i++) {
291 sendReplicate(actorContext, lastIndex+i+1);
292 leader.handleMessage(followerActor, new AppendEntriesReply(
293 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
297 for(int i=3;i<5;i++) {
298 sendReplicate(actorContext, lastIndex + i + 1);
301 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
302 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
303 // get sent to the follower - but not the 5th
304 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
306 for(int i=0;i<4;i++) {
307 long expected = allMessages.get(i).getEntries().get(0).getIndex();
308 assertEquals(expected, i+2);
313 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
314 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
316 MockRaftActorContext actorContext = createActorContextWithFollower();
317 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
319 public FiniteDuration getHeartBeatInterval() {
320 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
325 actorContext.getTermInformation().update(term, "");
327 leader = new Leader(actorContext);
329 // Leader will send an immediate heartbeat - ignore it.
330 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
332 // The follower would normally reply - simulate that explicitly here.
333 long lastIndex = actorContext.getReplicatedLog().lastIndex();
334 leader.handleMessage(followerActor, new AppendEntriesReply(
335 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
336 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
338 followerActor.underlyingActor().clear();
340 sendReplicate(actorContext, lastIndex+1);
342 // Wait slightly longer than heartbeat duration
343 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
345 leader.handleMessage(leaderActor, new SendHeartBeat());
347 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
348 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
350 assertEquals(1, allMessages.get(0).getEntries().size());
351 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
352 assertEquals(1, allMessages.get(1).getEntries().size());
353 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
358 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
359 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
361 MockRaftActorContext actorContext = createActorContextWithFollower();
362 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
364 public FiniteDuration getHeartBeatInterval() {
365 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
370 actorContext.getTermInformation().update(term, "");
372 leader = new Leader(actorContext);
374 // Leader will send an immediate heartbeat - ignore it.
375 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
377 // The follower would normally reply - simulate that explicitly here.
378 long lastIndex = actorContext.getReplicatedLog().lastIndex();
379 leader.handleMessage(followerActor, new AppendEntriesReply(
380 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
381 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
383 followerActor.underlyingActor().clear();
385 for(int i=0;i<3;i++) {
386 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
387 leader.handleMessage(leaderActor, new SendHeartBeat());
390 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
391 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
395 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
396 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
398 MockRaftActorContext actorContext = createActorContextWithFollower();
399 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
401 public FiniteDuration getHeartBeatInterval() {
402 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
407 actorContext.getTermInformation().update(term, "");
409 leader = new Leader(actorContext);
411 // Leader will send an immediate heartbeat - ignore it.
412 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
414 // The follower would normally reply - simulate that explicitly here.
415 long lastIndex = actorContext.getReplicatedLog().lastIndex();
416 leader.handleMessage(followerActor, new AppendEntriesReply(
417 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
418 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
420 followerActor.underlyingActor().clear();
422 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
423 leader.handleMessage(leaderActor, new SendHeartBeat());
424 sendReplicate(actorContext, lastIndex+1);
426 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
427 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
429 assertEquals(0, allMessages.get(0).getEntries().size());
430 assertEquals(1, allMessages.get(1).getEntries().size());
435 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
436 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
438 MockRaftActorContext actorContext = createActorContext();
440 leader = new Leader(actorContext);
442 actorContext.setLastApplied(0);
444 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
445 long term = actorContext.getTermInformation().getCurrentTerm();
446 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
447 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
449 actorContext.getReplicatedLog().append(newEntry);
451 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
452 new Replicate(leaderActor, "state-id", newEntry));
454 // State should not change
455 assertTrue(raftBehavior instanceof Leader);
457 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
459 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
460 // one since lastApplied state is 0.
461 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
462 leaderActor, ApplyState.class);
463 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
465 for(int i = 0; i <= newLogIndex - 1; i++ ) {
466 ApplyState applyState = applyStateList.get(i);
467 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
468 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
471 ApplyState last = applyStateList.get((int) newLogIndex - 1);
472 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
473 assertEquals("getIdentifier", "state-id", last.getIdentifier());
477 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
478 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
480 MockRaftActorContext actorContext = createActorContextWithFollower();
482 Map<String, String> leadersSnapshot = new HashMap<>();
483 leadersSnapshot.put("1", "A");
484 leadersSnapshot.put("2", "B");
485 leadersSnapshot.put("3", "C");
488 actorContext.getReplicatedLog().removeFrom(0);
490 final int commitIndex = 3;
491 final int snapshotIndex = 2;
492 final int newEntryIndex = 4;
493 final int snapshotTerm = 1;
494 final int currentTerm = 2;
496 // set the snapshot variables in replicatedlog
497 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
498 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
499 actorContext.setCommitIndex(commitIndex);
500 //set follower timeout to 2 mins, helps during debugging
501 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
503 leader = new Leader(actorContext);
505 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
506 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
509 ReplicatedLogImplEntry entry =
510 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
511 new MockRaftActorContext.MockPayload("D"));
513 //update follower timestamp
514 leader.markFollowerActive(FOLLOWER_ID);
516 ByteString bs = toByteString(leadersSnapshot);
517 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
518 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
519 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
520 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
522 //send first chunk and no InstallSnapshotReply received yet
524 fts.incrementChunkIndex();
526 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
527 TimeUnit.MILLISECONDS);
529 leader.handleMessage(leaderActor, new SendHeartBeat());
531 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
533 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
535 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
537 //InstallSnapshotReply received
538 fts.markSendStatus(true);
540 leader.handleMessage(leaderActor, new SendHeartBeat());
542 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
544 assertEquals(commitIndex, is.getLastIncludedIndex());
548 public void testSendAppendEntriesSnapshotScenario() throws Exception {
549 logStart("testSendAppendEntriesSnapshotScenario");
551 MockRaftActorContext actorContext = createActorContextWithFollower();
553 Map<String, String> leadersSnapshot = new HashMap<>();
554 leadersSnapshot.put("1", "A");
555 leadersSnapshot.put("2", "B");
556 leadersSnapshot.put("3", "C");
559 actorContext.getReplicatedLog().removeFrom(0);
561 final int followersLastIndex = 2;
562 final int snapshotIndex = 3;
563 final int newEntryIndex = 4;
564 final int snapshotTerm = 1;
565 final int currentTerm = 2;
567 // set the snapshot variables in replicatedlog
568 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570 actorContext.setCommitIndex(followersLastIndex);
572 leader = new Leader(actorContext);
574 // Leader will send an immediate heartbeat - ignore it.
575 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
578 ReplicatedLogImplEntry entry =
579 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580 new MockRaftActorContext.MockPayload("D"));
582 actorContext.getReplicatedLog().append(entry);
584 //update follower timestamp
585 leader.markFollowerActive(FOLLOWER_ID);
587 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
588 RaftActorBehavior raftBehavior = leader.handleMessage(
589 leaderActor, new Replicate(null, "state-id", entry));
591 assertTrue(raftBehavior instanceof Leader);
593 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
597 public void testInitiateInstallSnapshot() throws Exception {
598 logStart("testInitiateInstallSnapshot");
600 MockRaftActorContext actorContext = createActorContextWithFollower();
602 Map<String, String> leadersSnapshot = new HashMap<>();
603 leadersSnapshot.put("1", "A");
604 leadersSnapshot.put("2", "B");
605 leadersSnapshot.put("3", "C");
608 actorContext.getReplicatedLog().removeFrom(0);
610 final int followersLastIndex = 2;
611 final int snapshotIndex = 3;
612 final int newEntryIndex = 4;
613 final int snapshotTerm = 1;
614 final int currentTerm = 2;
616 // set the snapshot variables in replicatedlog
617 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
618 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
619 actorContext.setLastApplied(3);
620 actorContext.setCommitIndex(followersLastIndex);
622 leader = new Leader(actorContext);
624 // Leader will send an immediate heartbeat - ignore it.
625 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
627 // set the snapshot as absent and check if capture-snapshot is invoked.
628 leader.setSnapshot(null);
631 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
632 new MockRaftActorContext.MockPayload("D"));
634 actorContext.getReplicatedLog().append(entry);
636 //update follower timestamp
637 leader.markFollowerActive(FOLLOWER_ID);
639 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
641 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
643 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
645 assertTrue(cs.isInstallSnapshotInitiated());
646 assertEquals(3, cs.getLastAppliedIndex());
647 assertEquals(1, cs.getLastAppliedTerm());
648 assertEquals(4, cs.getLastIndex());
649 assertEquals(2, cs.getLastTerm());
651 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
652 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
654 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
658 public void testInstallSnapshot() throws Exception {
659 logStart("testInstallSnapshot");
661 MockRaftActorContext actorContext = createActorContextWithFollower();
663 Map<String, String> leadersSnapshot = new HashMap<>();
664 leadersSnapshot.put("1", "A");
665 leadersSnapshot.put("2", "B");
666 leadersSnapshot.put("3", "C");
669 actorContext.getReplicatedLog().removeFrom(0);
671 final int lastAppliedIndex = 3;
672 final int snapshotIndex = 2;
673 final int snapshotTerm = 1;
674 final int currentTerm = 2;
676 // set the snapshot variables in replicatedlog
677 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
678 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
679 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
680 actorContext.setCommitIndex(lastAppliedIndex);
681 actorContext.setLastApplied(lastAppliedIndex);
683 leader = new Leader(actorContext);
685 // Initial heartbeat.
686 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
689 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
691 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
692 Collections.<ReplicatedLogEntry>emptyList(),
693 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
695 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
697 assertTrue(raftBehavior instanceof Leader);
699 // check if installsnapshot gets called with the correct values.
701 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
703 assertNotNull(installSnapshot.getData());
704 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
705 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
707 assertEquals(currentTerm, installSnapshot.getTerm());
711 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
712 logStart("testHandleInstallSnapshotReplyLastChunk");
714 MockRaftActorContext actorContext = createActorContextWithFollower();
716 final int commitIndex = 3;
717 final int snapshotIndex = 2;
718 final int snapshotTerm = 1;
719 final int currentTerm = 2;
721 actorContext.setCommitIndex(commitIndex);
723 leader = new Leader(actorContext);
725 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
726 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
728 // Ignore initial heartbeat.
729 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
731 Map<String, String> leadersSnapshot = new HashMap<>();
732 leadersSnapshot.put("1", "A");
733 leadersSnapshot.put("2", "B");
734 leadersSnapshot.put("3", "C");
736 // set the snapshot variables in replicatedlog
738 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
739 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
740 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
742 ByteString bs = toByteString(leadersSnapshot);
743 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
744 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
745 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
746 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
747 while(!fts.isLastChunk(fts.getChunkIndex())) {
749 fts.incrementChunkIndex();
753 actorContext.getReplicatedLog().removeFrom(0);
755 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
756 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
758 assertTrue(raftBehavior instanceof Leader);
760 assertEquals(0, leader.followerSnapshotSize());
761 assertEquals(1, leader.followerLogSize());
762 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
764 assertEquals(commitIndex, fli.getMatchIndex());
765 assertEquals(commitIndex + 1, fli.getNextIndex());
769 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
770 logStart("testSendSnapshotfromInstallSnapshotReply");
772 MockRaftActorContext actorContext = createActorContextWithFollower();
774 final int commitIndex = 3;
775 final int snapshotIndex = 2;
776 final int snapshotTerm = 1;
777 final int currentTerm = 2;
779 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
781 public int getSnapshotChunkSize() {
785 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
786 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
788 actorContext.setConfigParams(configParams);
789 actorContext.setCommitIndex(commitIndex);
791 leader = new Leader(actorContext);
793 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
794 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
796 Map<String, String> leadersSnapshot = new HashMap<>();
797 leadersSnapshot.put("1", "A");
798 leadersSnapshot.put("2", "B");
799 leadersSnapshot.put("3", "C");
801 // set the snapshot variables in replicatedlog
802 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806 ByteString bs = toByteString(leadersSnapshot);
807 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
808 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
809 leader.setSnapshot(snapshot);
811 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
813 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
815 assertEquals(1, installSnapshot.getChunkIndex());
816 assertEquals(3, installSnapshot.getTotalChunks());
818 followerActor.underlyingActor().clear();
819 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
820 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
822 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
824 assertEquals(2, installSnapshot.getChunkIndex());
825 assertEquals(3, installSnapshot.getTotalChunks());
827 followerActor.underlyingActor().clear();
828 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
829 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
831 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
833 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
834 followerActor.underlyingActor().clear();
835 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
836 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
838 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
840 Assert.assertNull(installSnapshot);
845 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
846 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
848 MockRaftActorContext actorContext = createActorContextWithFollower();
850 final int commitIndex = 3;
851 final int snapshotIndex = 2;
852 final int snapshotTerm = 1;
853 final int currentTerm = 2;
855 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
857 public int getSnapshotChunkSize() {
862 actorContext.setCommitIndex(commitIndex);
864 leader = new Leader(actorContext);
866 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
867 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
869 Map<String, String> leadersSnapshot = new HashMap<>();
870 leadersSnapshot.put("1", "A");
871 leadersSnapshot.put("2", "B");
872 leadersSnapshot.put("3", "C");
874 // set the snapshot variables in replicatedlog
875 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
876 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
877 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
879 ByteString bs = toByteString(leadersSnapshot);
880 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
881 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
882 leader.setSnapshot(snapshot);
884 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
885 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
887 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
889 assertEquals(1, installSnapshot.getChunkIndex());
890 assertEquals(3, installSnapshot.getTotalChunks());
892 followerActor.underlyingActor().clear();
894 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
895 FOLLOWER_ID, -1, false));
897 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
898 TimeUnit.MILLISECONDS);
900 leader.handleMessage(leaderActor, new SendHeartBeat());
902 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
904 assertEquals(1, installSnapshot.getChunkIndex());
905 assertEquals(3, installSnapshot.getTotalChunks());
909 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
910 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
912 MockRaftActorContext actorContext = createActorContextWithFollower();
914 final int commitIndex = 3;
915 final int snapshotIndex = 2;
916 final int snapshotTerm = 1;
917 final int currentTerm = 2;
919 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
921 public int getSnapshotChunkSize() {
926 actorContext.setCommitIndex(commitIndex);
928 leader = new Leader(actorContext);
930 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
931 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
933 Map<String, String> leadersSnapshot = new HashMap<>();
934 leadersSnapshot.put("1", "A");
935 leadersSnapshot.put("2", "B");
936 leadersSnapshot.put("3", "C");
938 // set the snapshot variables in replicatedlog
939 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
940 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
941 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
943 ByteString bs = toByteString(leadersSnapshot);
944 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
945 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
946 leader.setSnapshot(snapshot);
948 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
950 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
952 assertEquals(1, installSnapshot.getChunkIndex());
953 assertEquals(3, installSnapshot.getTotalChunks());
954 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
956 int hashCode = installSnapshot.getData().hashCode();
958 followerActor.underlyingActor().clear();
960 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
961 FOLLOWER_ID, 1, true));
963 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
965 assertEquals(2, installSnapshot.getChunkIndex());
966 assertEquals(3, installSnapshot.getTotalChunks());
967 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
971 public void testFollowerToSnapshotLogic() {
972 logStart("testFollowerToSnapshotLogic");
974 MockRaftActorContext actorContext = createActorContext();
976 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
978 public int getSnapshotChunkSize() {
983 leader = new Leader(actorContext);
985 Map<String, String> leadersSnapshot = new HashMap<>();
986 leadersSnapshot.put("1", "A");
987 leadersSnapshot.put("2", "B");
988 leadersSnapshot.put("3", "C");
990 ByteString bs = toByteString(leadersSnapshot);
991 byte[] barray = bs.toByteArray();
993 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
994 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
996 assertEquals(bs.size(), barray.length);
999 for (int i=0; i < barray.length; i = i + 50) {
1003 if (i + 50 > barray.length) {
1007 ByteString chunk = fts.getNextChunk();
1008 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1009 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1011 fts.markSendStatus(true);
1012 if (!fts.isLastChunk(chunkIndex)) {
1013 fts.incrementChunkIndex();
1017 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1020 @Override protected RaftActorBehavior createBehavior(
1021 RaftActorContext actorContext) {
1022 return new Leader(actorContext);
1026 protected MockRaftActorContext createActorContext() {
1027 return createActorContext(leaderActor);
1031 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1032 return createActorContext(LEADER_ID, actorRef);
1035 private MockRaftActorContext createActorContextWithFollower() {
1036 MockRaftActorContext actorContext = createActorContext();
1037 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1038 followerActor.path().toString()).build());
1039 return actorContext;
1042 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1043 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1045 configParams.setElectionTimeoutFactor(100000);
1046 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1047 context.setConfigParams(configParams);
1048 context.setPayloadVersion(payloadVersion);
1052 private MockRaftActorContext createFollowerActorContextWithLeader() {
1053 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1054 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1055 followerConfig.setElectionTimeoutFactor(10000);
1056 followerActorContext.setConfigParams(followerConfig);
1057 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1058 return followerActorContext;
1062 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1063 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1065 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1067 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1069 Follower follower = new Follower(followerActorContext);
1070 followerActor.underlyingActor().setBehavior(follower);
1072 Map<String, String> peerAddresses = new HashMap<>();
1073 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1075 leaderActorContext.setPeerAddresses(peerAddresses);
1077 leaderActorContext.getReplicatedLog().removeFrom(0);
1080 leaderActorContext.setReplicatedLog(
1081 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1083 leaderActorContext.setCommitIndex(1);
1085 followerActorContext.getReplicatedLog().removeFrom(0);
1087 // follower too has the exact same log entries and has the same commit index
1088 followerActorContext.setReplicatedLog(
1089 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1091 followerActorContext.setCommitIndex(1);
1093 leader = new Leader(leaderActorContext);
1095 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1097 assertEquals(1, appendEntries.getLeaderCommit());
1098 assertEquals(0, appendEntries.getEntries().size());
1099 assertEquals(0, appendEntries.getPrevLogIndex());
1101 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1102 leaderActor, AppendEntriesReply.class);
1104 assertEquals(2, appendEntriesReply.getLogLastIndex());
1105 assertEquals(1, appendEntriesReply.getLogLastTerm());
1107 // follower returns its next index
1108 assertEquals(2, appendEntriesReply.getLogLastIndex());
1109 assertEquals(1, appendEntriesReply.getLogLastTerm());
1115 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1116 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1118 MockRaftActorContext leaderActorContext = createActorContext();
1120 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1121 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1123 Follower follower = new Follower(followerActorContext);
1124 followerActor.underlyingActor().setBehavior(follower);
1126 Map<String, String> leaderPeerAddresses = new HashMap<>();
1127 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1129 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1131 leaderActorContext.getReplicatedLog().removeFrom(0);
1133 leaderActorContext.setReplicatedLog(
1134 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1136 leaderActorContext.setCommitIndex(1);
1138 followerActorContext.getReplicatedLog().removeFrom(0);
1140 followerActorContext.setReplicatedLog(
1141 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1143 // follower has the same log entries but its commit index > leaders commit index
1144 followerActorContext.setCommitIndex(2);
1146 leader = new Leader(leaderActorContext);
1148 // Initial heartbeat
1149 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1151 assertEquals(1, appendEntries.getLeaderCommit());
1152 assertEquals(0, appendEntries.getEntries().size());
1153 assertEquals(0, appendEntries.getPrevLogIndex());
1155 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1156 leaderActor, AppendEntriesReply.class);
1158 assertEquals(2, appendEntriesReply.getLogLastIndex());
1159 assertEquals(1, appendEntriesReply.getLogLastTerm());
1161 leaderActor.underlyingActor().setBehavior(follower);
1162 leader.handleMessage(followerActor, appendEntriesReply);
1164 leaderActor.underlyingActor().clear();
1165 followerActor.underlyingActor().clear();
1167 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1168 TimeUnit.MILLISECONDS);
1170 leader.handleMessage(leaderActor, new SendHeartBeat());
1172 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1174 assertEquals(2, appendEntries.getLeaderCommit());
1175 assertEquals(0, appendEntries.getEntries().size());
1176 assertEquals(2, appendEntries.getPrevLogIndex());
1178 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1180 assertEquals(2, appendEntriesReply.getLogLastIndex());
1181 assertEquals(1, appendEntriesReply.getLogLastTerm());
1183 assertEquals(2, followerActorContext.getCommitIndex());
1189 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1190 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1192 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1193 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1194 new FiniteDuration(1000, TimeUnit.SECONDS));
1196 leaderActorContext.setReplicatedLog(
1197 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1198 long leaderCommitIndex = 2;
1199 leaderActorContext.setCommitIndex(leaderCommitIndex);
1200 leaderActorContext.setLastApplied(leaderCommitIndex);
1202 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1203 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1205 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1207 followerActorContext.setReplicatedLog(
1208 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1209 followerActorContext.setCommitIndex(0);
1210 followerActorContext.setLastApplied(0);
1212 Follower follower = new Follower(followerActorContext);
1213 followerActor.underlyingActor().setBehavior(follower);
1215 leader = new Leader(leaderActorContext);
1217 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1218 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1220 MessageCollectorActor.clearMessages(followerActor);
1221 MessageCollectorActor.clearMessages(leaderActor);
1223 // Verify initial AppendEntries sent with the leader's current commit index.
1224 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1225 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1226 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1228 leaderActor.underlyingActor().setBehavior(leader);
1230 leader.handleMessage(followerActor, appendEntriesReply);
1232 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1233 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1235 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1236 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1237 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1239 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1240 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1241 appendEntries.getEntries().get(0).getData());
1242 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1243 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1244 appendEntries.getEntries().get(1).getData());
1246 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1247 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1249 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1251 ApplyState applyState = applyStateList.get(0);
1252 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1253 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1254 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1255 applyState.getReplicatedLogEntry().getData());
1257 applyState = applyStateList.get(1);
1258 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1259 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1260 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1261 applyState.getReplicatedLogEntry().getData());
1263 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1264 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1268 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1269 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1271 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1272 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1273 new FiniteDuration(1000, TimeUnit.SECONDS));
1275 leaderActorContext.setReplicatedLog(
1276 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1277 long leaderCommitIndex = 1;
1278 leaderActorContext.setCommitIndex(leaderCommitIndex);
1279 leaderActorContext.setLastApplied(leaderCommitIndex);
1281 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1282 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1284 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1286 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1287 followerActorContext.setCommitIndex(-1);
1288 followerActorContext.setLastApplied(-1);
1290 Follower follower = new Follower(followerActorContext);
1291 followerActor.underlyingActor().setBehavior(follower);
1293 leader = new Leader(leaderActorContext);
1295 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1296 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1298 MessageCollectorActor.clearMessages(followerActor);
1299 MessageCollectorActor.clearMessages(leaderActor);
1301 // Verify initial AppendEntries sent with the leader's current commit index.
1302 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1303 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1304 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1306 leaderActor.underlyingActor().setBehavior(leader);
1308 leader.handleMessage(followerActor, appendEntriesReply);
1310 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1311 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1313 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1314 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1315 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1317 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1318 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1319 appendEntries.getEntries().get(0).getData());
1320 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1321 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1322 appendEntries.getEntries().get(1).getData());
1324 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1325 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1327 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1329 ApplyState applyState = applyStateList.get(0);
1330 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1331 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1332 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1333 applyState.getReplicatedLogEntry().getData());
1335 applyState = applyStateList.get(1);
1336 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1337 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1338 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1339 applyState.getReplicatedLogEntry().getData());
1341 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1342 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1346 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1347 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1349 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1350 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1351 new FiniteDuration(1000, TimeUnit.SECONDS));
1353 leaderActorContext.setReplicatedLog(
1354 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1355 long leaderCommitIndex = 1;
1356 leaderActorContext.setCommitIndex(leaderCommitIndex);
1357 leaderActorContext.setLastApplied(leaderCommitIndex);
1359 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1360 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1362 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1364 followerActorContext.setReplicatedLog(
1365 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1366 followerActorContext.setCommitIndex(-1);
1367 followerActorContext.setLastApplied(-1);
1369 Follower follower = new Follower(followerActorContext);
1370 followerActor.underlyingActor().setBehavior(follower);
1372 leader = new Leader(leaderActorContext);
1374 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1375 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1377 MessageCollectorActor.clearMessages(followerActor);
1378 MessageCollectorActor.clearMessages(leaderActor);
1380 // Verify initial AppendEntries sent with the leader's current commit index.
1381 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1382 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1383 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1385 leaderActor.underlyingActor().setBehavior(leader);
1387 leader.handleMessage(followerActor, appendEntriesReply);
1389 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1390 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1393 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1394 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1396 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1397 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1398 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1399 appendEntries.getEntries().get(0).getData());
1400 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1401 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1402 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1403 appendEntries.getEntries().get(1).getData());
1405 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1406 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1408 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1410 ApplyState applyState = applyStateList.get(0);
1411 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1412 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1413 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1414 applyState.getReplicatedLogEntry().getData());
1416 applyState = applyStateList.get(1);
1417 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1418 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1419 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1420 applyState.getReplicatedLogEntry().getData());
1422 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1423 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1424 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1428 public void testHandleAppendEntriesReplySuccess() throws Exception {
1429 logStart("testHandleAppendEntriesReplySuccess");
1431 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1433 leaderActorContext.setReplicatedLog(
1434 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1436 leaderActorContext.setCommitIndex(1);
1437 leaderActorContext.setLastApplied(1);
1438 leaderActorContext.getTermInformation().update(1, "leader");
1440 leader = new Leader(leaderActorContext);
1442 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1444 short payloadVersion = 5;
1445 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1447 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1449 assertEquals(RaftState.Leader, raftActorBehavior.state());
1451 assertEquals(2, leaderActorContext.getCommitIndex());
1453 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1454 leaderActor, ApplyJournalEntries.class);
1456 assertEquals(2, leaderActorContext.getLastApplied());
1458 assertEquals(2, applyJournalEntries.getToIndex());
1460 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1463 assertEquals(1,applyStateList.size());
1465 ApplyState applyState = applyStateList.get(0);
1467 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1469 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1470 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1474 public void testHandleAppendEntriesReplyUnknownFollower(){
1475 logStart("testHandleAppendEntriesReplyUnknownFollower");
1477 MockRaftActorContext leaderActorContext = createActorContext();
1479 leader = new Leader(leaderActorContext);
1481 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1483 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1485 assertEquals(RaftState.Leader, raftActorBehavior.state());
1489 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1490 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1492 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1493 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1494 new FiniteDuration(1000, TimeUnit.SECONDS));
1495 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1497 leaderActorContext.setReplicatedLog(
1498 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1499 long leaderCommitIndex = 3;
1500 leaderActorContext.setCommitIndex(leaderCommitIndex);
1501 leaderActorContext.setLastApplied(leaderCommitIndex);
1503 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1504 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1505 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1506 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1508 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1510 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1511 followerActorContext.setCommitIndex(-1);
1512 followerActorContext.setLastApplied(-1);
1514 Follower follower = new Follower(followerActorContext);
1515 followerActor.underlyingActor().setBehavior(follower);
1517 leader = new Leader(leaderActorContext);
1519 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1520 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1522 MessageCollectorActor.clearMessages(followerActor);
1523 MessageCollectorActor.clearMessages(leaderActor);
1525 // Verify initial AppendEntries sent with the leader's current commit index.
1526 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1527 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1528 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1530 leaderActor.underlyingActor().setBehavior(leader);
1532 leader.handleMessage(followerActor, appendEntriesReply);
1534 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1535 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1537 appendEntries = appendEntriesList.get(0);
1538 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1539 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1540 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1542 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1543 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1544 appendEntries.getEntries().get(0).getData());
1545 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1546 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1547 appendEntries.getEntries().get(1).getData());
1549 appendEntries = appendEntriesList.get(1);
1550 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1551 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1552 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1554 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1555 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1556 appendEntries.getEntries().get(0).getData());
1557 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1558 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1559 appendEntries.getEntries().get(1).getData());
1561 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1562 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1564 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1566 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1567 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1571 public void testHandleRequestVoteReply(){
1572 logStart("testHandleRequestVoteReply");
1574 MockRaftActorContext leaderActorContext = createActorContext();
1576 leader = new Leader(leaderActorContext);
1578 // Should be a no-op.
1579 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1580 new RequestVoteReply(1, true));
1582 assertEquals(RaftState.Leader, raftActorBehavior.state());
1584 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1586 assertEquals(RaftState.Leader, raftActorBehavior.state());
1590 public void testIsolatedLeaderCheckNoFollowers() {
1591 logStart("testIsolatedLeaderCheckNoFollowers");
1593 MockRaftActorContext leaderActorContext = createActorContext();
1595 leader = new Leader(leaderActorContext);
1596 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1597 Assert.assertTrue(behavior instanceof Leader);
1601 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1602 logStart("testIsolatedLeaderCheckTwoFollowers");
1604 new JavaTestKit(getSystem()) {{
1606 ActorRef followerActor1 = getTestActor();
1607 ActorRef followerActor2 = getTestActor();
1609 MockRaftActorContext leaderActorContext = createActorContext();
1611 Map<String, String> peerAddresses = new HashMap<>();
1612 peerAddresses.put("follower-1", followerActor1.path().toString());
1613 peerAddresses.put("follower-2", followerActor2.path().toString());
1615 leaderActorContext.setPeerAddresses(peerAddresses);
1617 leader = new Leader(leaderActorContext);
1619 leader.markFollowerActive("follower-1");
1620 leader.markFollowerActive("follower-2");
1621 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1622 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1623 behavior instanceof Leader);
1625 // kill 1 follower and verify if that got killed
1626 final JavaTestKit probe = new JavaTestKit(getSystem());
1627 probe.watch(followerActor1);
1628 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1629 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1630 assertEquals(termMsg1.getActor(), followerActor1);
1632 leader.markFollowerInActive("follower-1");
1633 leader.markFollowerActive("follower-2");
1634 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1635 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1636 behavior instanceof Leader);
1638 // kill 2nd follower and leader should change to Isolated leader
1639 followerActor2.tell(PoisonPill.getInstance(), null);
1640 probe.watch(followerActor2);
1641 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1642 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1643 assertEquals(termMsg2.getActor(), followerActor2);
1645 leader.markFollowerInActive("follower-2");
1646 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1647 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1648 behavior instanceof IsolatedLeader);
1653 public void testLaggingFollowerStarvation() throws Exception {
1654 logStart("testLaggingFollowerStarvation");
1655 new JavaTestKit(getSystem()) {{
1656 String leaderActorId = actorFactory.generateActorId("leader");
1657 String follower1ActorId = actorFactory.generateActorId("follower");
1658 String follower2ActorId = actorFactory.generateActorId("follower");
1660 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1661 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1662 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1663 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1665 MockRaftActorContext leaderActorContext =
1666 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1668 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1669 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1670 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1672 leaderActorContext.setConfigParams(configParams);
1674 leaderActorContext.setReplicatedLog(
1675 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1677 Map<String, String> peerAddresses = new HashMap<>();
1678 peerAddresses.put(follower1ActorId,
1679 follower1Actor.path().toString());
1680 peerAddresses.put(follower2ActorId,
1681 follower2Actor.path().toString());
1683 leaderActorContext.setPeerAddresses(peerAddresses);
1684 leaderActorContext.getTermInformation().update(1, leaderActorId);
1686 RaftActorBehavior leader = createBehavior(leaderActorContext);
1688 leaderActor.underlyingActor().setBehavior(leader);
1690 for(int i=1;i<6;i++) {
1691 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1692 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1693 assertTrue(newBehavior == leader);
1694 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1697 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1698 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1700 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1701 heartbeats.size() > 1);
1703 // Check if follower-2 got AppendEntries during this time and was not starved
1704 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1706 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1707 appendEntries.size() > 1);
1713 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1714 ActorRef actorRef, RaftRPC rpc) throws Exception {
1715 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1716 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1719 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1721 private final long electionTimeOutIntervalMillis;
1722 private final int snapshotChunkSize;
1724 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1726 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1727 this.snapshotChunkSize = snapshotChunkSize;
1731 public FiniteDuration getElectionTimeOutInterval() {
1732 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1736 public int getSnapshotChunkSize() {
1737 return snapshotChunkSize;