2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
35 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
39 import org.opendaylight.controller.cluster.raft.RaftState;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
42 import org.opendaylight.controller.cluster.raft.SerializationUtils;
43 import org.opendaylight.controller.cluster.raft.Snapshot;
44 import org.opendaylight.controller.cluster.raft.VotingState;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
49 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
61 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
62 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
63 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
64 import scala.concurrent.duration.FiniteDuration;
66 public class LeaderTest extends AbstractLeaderTest {
68 static final String FOLLOWER_ID = "follower";
69 public static final String LEADER_ID = "leader";
71 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
72 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
74 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
75 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
77 private Leader leader;
78 private final short payloadVersion = 5;
82 public void tearDown() throws Exception {
91 public void testHandleMessageForUnknownMessage() throws Exception {
92 logStart("testHandleMessageForUnknownMessage");
94 leader = new Leader(createActorContext());
96 // handle message should return the Leader state when it receives an
98 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
99 Assert.assertTrue(behavior instanceof Leader);
103 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106 MockRaftActorContext actorContext = createActorContextWithFollower();
107 short payloadVersion = (short)5;
108 actorContext.setPayloadVersion(payloadVersion);
111 actorContext.getTermInformation().update(term, "");
113 leader = new Leader(actorContext);
115 // Leader should send an immediate heartbeat with no entries as follower is inactive.
116 long lastIndex = actorContext.getReplicatedLog().lastIndex();
117 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
118 assertEquals("getTerm", term, appendEntries.getTerm());
119 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
120 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
121 assertEquals("Entries size", 0, appendEntries.getEntries().size());
122 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
124 // The follower would normally reply - simulate that explicitly here.
125 leader.handleMessage(followerActor, new AppendEntriesReply(
126 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
127 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
129 followerActor.underlyingActor().clear();
131 // Sleep for the heartbeat interval so AppendEntries is sent.
132 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
133 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
135 leader.handleMessage(leaderActor, new SendHeartBeat());
137 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
138 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
139 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
140 assertEquals("Entries size", 1, appendEntries.getEntries().size());
141 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
142 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
143 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
148 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
149 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
151 actorContext.getReplicatedLog().append(newEntry);
152 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
156 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
157 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
159 MockRaftActorContext actorContext = createActorContextWithFollower();
162 actorContext.getTermInformation().update(term, "");
164 leader = new Leader(actorContext);
166 // Leader will send an immediate heartbeat - ignore it.
167 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
169 // The follower would normally reply - simulate that explicitly here.
170 long lastIndex = actorContext.getReplicatedLog().lastIndex();
171 leader.handleMessage(followerActor, new AppendEntriesReply(
172 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
173 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
175 followerActor.underlyingActor().clear();
177 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
179 // State should not change
180 assertTrue(raftBehavior instanceof Leader);
182 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
183 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
184 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
185 assertEquals("Entries size", 1, appendEntries.getEntries().size());
186 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
187 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
188 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
189 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
193 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
194 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
196 MockRaftActorContext actorContext = createActorContextWithFollower();
197 actorContext.setRaftPolicy(createRaftPolicy(true, true));
200 actorContext.getTermInformation().update(term, "");
202 leader = new Leader(actorContext);
204 // Leader will send an immediate heartbeat - ignore it.
205 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
207 // The follower would normally reply - simulate that explicitly here.
208 long lastIndex = actorContext.getReplicatedLog().lastIndex();
209 leader.handleMessage(followerActor, new AppendEntriesReply(
210 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
211 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
213 followerActor.underlyingActor().clear();
215 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
217 // State should not change
218 assertTrue(raftBehavior instanceof Leader);
220 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
221 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
222 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
223 assertEquals("Entries size", 1, appendEntries.getEntries().size());
224 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
225 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
226 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
227 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
231 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
232 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
234 MockRaftActorContext actorContext = createActorContextWithFollower();
235 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
237 public FiniteDuration getHeartBeatInterval() {
238 return FiniteDuration.apply(5, TimeUnit.SECONDS);
243 actorContext.getTermInformation().update(term, "");
245 leader = new Leader(actorContext);
247 // Leader will send an immediate heartbeat - ignore it.
248 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
250 // The follower would normally reply - simulate that explicitly here.
251 long lastIndex = actorContext.getReplicatedLog().lastIndex();
252 leader.handleMessage(followerActor, new AppendEntriesReply(
253 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
254 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
256 followerActor.underlyingActor().clear();
258 for(int i=0;i<5;i++) {
259 sendReplicate(actorContext, lastIndex+i+1);
262 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
263 // We expect only 1 message to be sent because of two reasons,
264 // - an append entries reply was not received
265 // - the heartbeat interval has not expired
266 // In this scenario if multiple messages are sent they would likely be duplicates
267 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
271 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
272 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
274 MockRaftActorContext actorContext = createActorContextWithFollower();
275 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
277 public FiniteDuration getHeartBeatInterval() {
278 return FiniteDuration.apply(5, TimeUnit.SECONDS);
283 actorContext.getTermInformation().update(term, "");
285 leader = new Leader(actorContext);
287 // Leader will send an immediate heartbeat - ignore it.
288 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
290 // The follower would normally reply - simulate that explicitly here.
291 long lastIndex = actorContext.getReplicatedLog().lastIndex();
292 leader.handleMessage(followerActor, new AppendEntriesReply(
293 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
294 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
296 followerActor.underlyingActor().clear();
298 for(int i=0;i<3;i++) {
299 sendReplicate(actorContext, lastIndex+i+1);
300 leader.handleMessage(followerActor, new AppendEntriesReply(
301 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
305 for(int i=3;i<5;i++) {
306 sendReplicate(actorContext, lastIndex + i + 1);
309 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
310 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
311 // get sent to the follower - but not the 5th
312 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
314 for(int i=0;i<4;i++) {
315 long expected = allMessages.get(i).getEntries().get(0).getIndex();
316 assertEquals(expected, i+2);
321 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
322 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
324 MockRaftActorContext actorContext = createActorContextWithFollower();
325 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
327 public FiniteDuration getHeartBeatInterval() {
328 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
333 actorContext.getTermInformation().update(term, "");
335 leader = new Leader(actorContext);
337 // Leader will send an immediate heartbeat - ignore it.
338 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
340 // The follower would normally reply - simulate that explicitly here.
341 long lastIndex = actorContext.getReplicatedLog().lastIndex();
342 leader.handleMessage(followerActor, new AppendEntriesReply(
343 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
344 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
346 followerActor.underlyingActor().clear();
348 sendReplicate(actorContext, lastIndex+1);
350 // Wait slightly longer than heartbeat duration
351 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
353 leader.handleMessage(leaderActor, new SendHeartBeat());
355 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
356 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
358 assertEquals(1, allMessages.get(0).getEntries().size());
359 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
360 assertEquals(1, allMessages.get(1).getEntries().size());
361 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
366 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
367 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
369 MockRaftActorContext actorContext = createActorContextWithFollower();
370 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
372 public FiniteDuration getHeartBeatInterval() {
373 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
378 actorContext.getTermInformation().update(term, "");
380 leader = new Leader(actorContext);
382 // Leader will send an immediate heartbeat - ignore it.
383 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
385 // The follower would normally reply - simulate that explicitly here.
386 long lastIndex = actorContext.getReplicatedLog().lastIndex();
387 leader.handleMessage(followerActor, new AppendEntriesReply(
388 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
389 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
391 followerActor.underlyingActor().clear();
393 for(int i=0;i<3;i++) {
394 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
395 leader.handleMessage(leaderActor, new SendHeartBeat());
398 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
403 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
404 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
406 MockRaftActorContext actorContext = createActorContextWithFollower();
407 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
409 public FiniteDuration getHeartBeatInterval() {
410 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
415 actorContext.getTermInformation().update(term, "");
417 leader = new Leader(actorContext);
419 // Leader will send an immediate heartbeat - ignore it.
420 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
422 // The follower would normally reply - simulate that explicitly here.
423 long lastIndex = actorContext.getReplicatedLog().lastIndex();
424 leader.handleMessage(followerActor, new AppendEntriesReply(
425 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
426 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
428 followerActor.underlyingActor().clear();
430 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
431 leader.handleMessage(leaderActor, new SendHeartBeat());
432 sendReplicate(actorContext, lastIndex+1);
434 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
435 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
437 assertEquals(0, allMessages.get(0).getEntries().size());
438 assertEquals(1, allMessages.get(1).getEntries().size());
443 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
444 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
446 MockRaftActorContext actorContext = createActorContext();
448 leader = new Leader(actorContext);
450 actorContext.setLastApplied(0);
452 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
453 long term = actorContext.getTermInformation().getCurrentTerm();
454 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
455 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
457 actorContext.getReplicatedLog().append(newEntry);
459 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
460 new Replicate(leaderActor, "state-id", newEntry));
462 // State should not change
463 assertTrue(raftBehavior instanceof Leader);
465 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
467 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
468 // one since lastApplied state is 0.
469 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
470 leaderActor, ApplyState.class);
471 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
473 for(int i = 0; i <= newLogIndex - 1; i++ ) {
474 ApplyState applyState = applyStateList.get(i);
475 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
476 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
479 ApplyState last = applyStateList.get((int) newLogIndex - 1);
480 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
481 assertEquals("getIdentifier", "state-id", last.getIdentifier());
485 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
486 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
488 MockRaftActorContext actorContext = createActorContextWithFollower();
490 Map<String, String> leadersSnapshot = new HashMap<>();
491 leadersSnapshot.put("1", "A");
492 leadersSnapshot.put("2", "B");
493 leadersSnapshot.put("3", "C");
496 actorContext.getReplicatedLog().removeFrom(0);
498 final int commitIndex = 3;
499 final int snapshotIndex = 2;
500 final int newEntryIndex = 4;
501 final int snapshotTerm = 1;
502 final int currentTerm = 2;
504 // set the snapshot variables in replicatedlog
505 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
506 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
507 actorContext.setCommitIndex(commitIndex);
508 //set follower timeout to 2 mins, helps during debugging
509 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
511 leader = new Leader(actorContext);
513 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
514 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
517 ReplicatedLogImplEntry entry =
518 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
519 new MockRaftActorContext.MockPayload("D"));
521 //update follower timestamp
522 leader.markFollowerActive(FOLLOWER_ID);
524 ByteString bs = toByteString(leadersSnapshot);
525 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
526 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
527 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
528 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
530 //send first chunk and no InstallSnapshotReply received yet
532 fts.incrementChunkIndex();
534 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
535 TimeUnit.MILLISECONDS);
537 leader.handleMessage(leaderActor, new SendHeartBeat());
539 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
541 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
543 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
545 //InstallSnapshotReply received
546 fts.markSendStatus(true);
548 leader.handleMessage(leaderActor, new SendHeartBeat());
550 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
552 assertEquals(commitIndex, is.getLastIncludedIndex());
556 public void testSendAppendEntriesSnapshotScenario() throws Exception {
557 logStart("testSendAppendEntriesSnapshotScenario");
559 MockRaftActorContext actorContext = createActorContextWithFollower();
561 Map<String, String> leadersSnapshot = new HashMap<>();
562 leadersSnapshot.put("1", "A");
563 leadersSnapshot.put("2", "B");
564 leadersSnapshot.put("3", "C");
567 actorContext.getReplicatedLog().removeFrom(0);
569 final int followersLastIndex = 2;
570 final int snapshotIndex = 3;
571 final int newEntryIndex = 4;
572 final int snapshotTerm = 1;
573 final int currentTerm = 2;
575 // set the snapshot variables in replicatedlog
576 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
577 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
578 actorContext.setCommitIndex(followersLastIndex);
580 leader = new Leader(actorContext);
582 // Leader will send an immediate heartbeat - ignore it.
583 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
586 ReplicatedLogImplEntry entry =
587 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
588 new MockRaftActorContext.MockPayload("D"));
590 actorContext.getReplicatedLog().append(entry);
592 //update follower timestamp
593 leader.markFollowerActive(FOLLOWER_ID);
595 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
596 RaftActorBehavior raftBehavior = leader.handleMessage(
597 leaderActor, new Replicate(null, "state-id", entry));
599 assertTrue(raftBehavior instanceof Leader);
601 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
605 public void testInitiateInstallSnapshot() throws Exception {
606 logStart("testInitiateInstallSnapshot");
608 MockRaftActorContext actorContext = createActorContextWithFollower();
611 actorContext.getReplicatedLog().removeFrom(0);
613 final int followersLastIndex = 2;
614 final int snapshotIndex = 3;
615 final int newEntryIndex = 4;
616 final int snapshotTerm = 1;
617 final int currentTerm = 2;
619 // set the snapshot variables in replicatedlog
620 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
621 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
622 actorContext.setLastApplied(3);
623 actorContext.setCommitIndex(followersLastIndex);
625 leader = new Leader(actorContext);
627 // Leader will send an immediate heartbeat - ignore it.
628 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
630 // set the snapshot as absent and check if capture-snapshot is invoked.
631 leader.setSnapshot(null);
634 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
635 new MockRaftActorContext.MockPayload("D"));
637 actorContext.getReplicatedLog().append(entry);
639 //update follower timestamp
640 leader.markFollowerActive(FOLLOWER_ID);
642 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
644 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
646 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
648 assertTrue(cs.isInstallSnapshotInitiated());
649 assertEquals(3, cs.getLastAppliedIndex());
650 assertEquals(1, cs.getLastAppliedTerm());
651 assertEquals(4, cs.getLastIndex());
652 assertEquals(2, cs.getLastTerm());
654 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
655 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
657 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
661 public void testInitiateForceInstallSnapshot() throws Exception {
662 logStart("testInitiateForceInstallSnapshot");
664 MockRaftActorContext actorContext = createActorContextWithFollower();
666 final int followersLastIndex = 2;
667 final int snapshotIndex = -1;
668 final int newEntryIndex = 4;
669 final int snapshotTerm = -1;
670 final int currentTerm = 2;
672 // set the snapshot variables in replicatedlog
673 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
674 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
675 actorContext.setLastApplied(3);
676 actorContext.setCommitIndex(followersLastIndex);
678 actorContext.getReplicatedLog().removeFrom(0);
680 leader = new Leader(actorContext);
682 // Leader will send an immediate heartbeat - ignore it.
683 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
685 // set the snapshot as absent and check if capture-snapshot is invoked.
686 leader.setSnapshot(null);
688 for(int i=0;i<4;i++) {
689 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
690 new MockRaftActorContext.MockPayload("X" + i)));
694 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695 new MockRaftActorContext.MockPayload("D"));
697 actorContext.getReplicatedLog().append(entry);
699 //update follower timestamp
700 leader.markFollowerActive(FOLLOWER_ID);
702 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
703 // installed with a SendInstallSnapshot
704 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
706 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
708 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
710 assertTrue(cs.isInstallSnapshotInitiated());
711 assertEquals(3, cs.getLastAppliedIndex());
712 assertEquals(1, cs.getLastAppliedTerm());
713 assertEquals(4, cs.getLastIndex());
714 assertEquals(2, cs.getLastTerm());
716 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
717 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
719 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
724 public void testInstallSnapshot() throws Exception {
725 logStart("testInstallSnapshot");
727 MockRaftActorContext actorContext = createActorContextWithFollower();
729 Map<String, String> leadersSnapshot = new HashMap<>();
730 leadersSnapshot.put("1", "A");
731 leadersSnapshot.put("2", "B");
732 leadersSnapshot.put("3", "C");
735 actorContext.getReplicatedLog().removeFrom(0);
737 final int lastAppliedIndex = 3;
738 final int snapshotIndex = 2;
739 final int snapshotTerm = 1;
740 final int currentTerm = 2;
742 // set the snapshot variables in replicatedlog
743 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
746 actorContext.setCommitIndex(lastAppliedIndex);
747 actorContext.setLastApplied(lastAppliedIndex);
749 leader = new Leader(actorContext);
751 // Initial heartbeat.
752 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
754 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
755 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
757 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
758 Collections.<ReplicatedLogEntry>emptyList(),
759 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
761 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
763 assertTrue(raftBehavior instanceof Leader);
765 // check if installsnapshot gets called with the correct values.
767 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
769 assertNotNull(installSnapshot.getData());
770 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
771 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
773 assertEquals(currentTerm, installSnapshot.getTerm());
777 public void testForceInstallSnapshot() throws Exception {
778 logStart("testForceInstallSnapshot");
780 MockRaftActorContext actorContext = createActorContextWithFollower();
782 Map<String, String> leadersSnapshot = new HashMap<>();
783 leadersSnapshot.put("1", "A");
784 leadersSnapshot.put("2", "B");
785 leadersSnapshot.put("3", "C");
787 final int lastAppliedIndex = 3;
788 final int snapshotIndex = -1;
789 final int snapshotTerm = -1;
790 final int currentTerm = 2;
792 // set the snapshot variables in replicatedlog
793 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
794 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
795 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
796 actorContext.setCommitIndex(lastAppliedIndex);
797 actorContext.setLastApplied(lastAppliedIndex);
799 leader = new Leader(actorContext);
801 // Initial heartbeat.
802 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
804 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
805 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
807 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
808 Collections.<ReplicatedLogEntry>emptyList(),
809 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
811 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
813 assertTrue(raftBehavior instanceof Leader);
815 // check if installsnapshot gets called with the correct values.
817 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
819 assertNotNull(installSnapshot.getData());
820 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
821 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
823 assertEquals(currentTerm, installSnapshot.getTerm());
827 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
828 logStart("testHandleInstallSnapshotReplyLastChunk");
830 MockRaftActorContext actorContext = createActorContextWithFollower();
832 final int commitIndex = 3;
833 final int snapshotIndex = 2;
834 final int snapshotTerm = 1;
835 final int currentTerm = 2;
837 actorContext.setCommitIndex(commitIndex);
839 leader = new Leader(actorContext);
841 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
842 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
844 // Ignore initial heartbeat.
845 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
847 Map<String, String> leadersSnapshot = new HashMap<>();
848 leadersSnapshot.put("1", "A");
849 leadersSnapshot.put("2", "B");
850 leadersSnapshot.put("3", "C");
852 // set the snapshot variables in replicatedlog
854 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
858 ByteString bs = toByteString(leadersSnapshot);
859 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
860 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
861 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
862 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
863 while(!fts.isLastChunk(fts.getChunkIndex())) {
865 fts.incrementChunkIndex();
869 actorContext.getReplicatedLog().removeFrom(0);
871 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
872 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
874 assertTrue(raftBehavior instanceof Leader);
876 assertEquals(0, leader.followerSnapshotSize());
877 assertEquals(1, leader.followerLogSize());
878 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
880 assertEquals(commitIndex, fli.getMatchIndex());
881 assertEquals(commitIndex + 1, fli.getNextIndex());
885 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
886 logStart("testSendSnapshotfromInstallSnapshotReply");
888 MockRaftActorContext actorContext = createActorContextWithFollower();
890 final int commitIndex = 3;
891 final int snapshotIndex = 2;
892 final int snapshotTerm = 1;
893 final int currentTerm = 2;
895 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
897 public int getSnapshotChunkSize() {
901 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
902 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
904 actorContext.setConfigParams(configParams);
905 actorContext.setCommitIndex(commitIndex);
907 leader = new Leader(actorContext);
909 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
910 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
912 Map<String, String> leadersSnapshot = new HashMap<>();
913 leadersSnapshot.put("1", "A");
914 leadersSnapshot.put("2", "B");
915 leadersSnapshot.put("3", "C");
917 // set the snapshot variables in replicatedlog
918 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
922 ByteString bs = toByteString(leadersSnapshot);
923 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
924 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
925 leader.setSnapshot(snapshot);
927 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
929 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
931 assertEquals(1, installSnapshot.getChunkIndex());
932 assertEquals(3, installSnapshot.getTotalChunks());
934 followerActor.underlyingActor().clear();
935 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
936 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
938 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
940 assertEquals(2, installSnapshot.getChunkIndex());
941 assertEquals(3, installSnapshot.getTotalChunks());
943 followerActor.underlyingActor().clear();
944 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
945 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
947 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
949 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
950 followerActor.underlyingActor().clear();
951 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
952 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
954 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
956 Assert.assertNull(installSnapshot);
961 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
962 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
964 MockRaftActorContext actorContext = createActorContextWithFollower();
966 final int commitIndex = 3;
967 final int snapshotIndex = 2;
968 final int snapshotTerm = 1;
969 final int currentTerm = 2;
971 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
973 public int getSnapshotChunkSize() {
978 actorContext.setCommitIndex(commitIndex);
980 leader = new Leader(actorContext);
982 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
983 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
985 Map<String, String> leadersSnapshot = new HashMap<>();
986 leadersSnapshot.put("1", "A");
987 leadersSnapshot.put("2", "B");
988 leadersSnapshot.put("3", "C");
990 // set the snapshot variables in replicatedlog
991 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
992 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
993 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
995 ByteString bs = toByteString(leadersSnapshot);
996 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
997 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
998 leader.setSnapshot(snapshot);
1000 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1001 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1003 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1005 assertEquals(1, installSnapshot.getChunkIndex());
1006 assertEquals(3, installSnapshot.getTotalChunks());
1008 followerActor.underlyingActor().clear();
1010 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1011 FOLLOWER_ID, -1, false));
1013 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1014 TimeUnit.MILLISECONDS);
1016 leader.handleMessage(leaderActor, new SendHeartBeat());
1018 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1020 assertEquals(1, installSnapshot.getChunkIndex());
1021 assertEquals(3, installSnapshot.getTotalChunks());
1025 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1026 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1028 MockRaftActorContext actorContext = createActorContextWithFollower();
1030 final int commitIndex = 3;
1031 final int snapshotIndex = 2;
1032 final int snapshotTerm = 1;
1033 final int currentTerm = 2;
1035 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1037 public int getSnapshotChunkSize() {
1042 actorContext.setCommitIndex(commitIndex);
1044 leader = new Leader(actorContext);
1046 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1049 Map<String, String> leadersSnapshot = new HashMap<>();
1050 leadersSnapshot.put("1", "A");
1051 leadersSnapshot.put("2", "B");
1052 leadersSnapshot.put("3", "C");
1054 // set the snapshot variables in replicatedlog
1055 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1059 ByteString bs = toByteString(leadersSnapshot);
1060 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062 leader.setSnapshot(snapshot);
1064 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1066 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068 assertEquals(1, installSnapshot.getChunkIndex());
1069 assertEquals(3, installSnapshot.getTotalChunks());
1070 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1072 int hashCode = installSnapshot.getData().hashCode();
1074 followerActor.underlyingActor().clear();
1076 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1077 FOLLOWER_ID, 1, true));
1079 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1081 assertEquals(2, installSnapshot.getChunkIndex());
1082 assertEquals(3, installSnapshot.getTotalChunks());
1083 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1087 public void testFollowerToSnapshotLogic() {
1088 logStart("testFollowerToSnapshotLogic");
1090 MockRaftActorContext actorContext = createActorContext();
1092 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1094 public int getSnapshotChunkSize() {
1099 leader = new Leader(actorContext);
1101 Map<String, String> leadersSnapshot = new HashMap<>();
1102 leadersSnapshot.put("1", "A");
1103 leadersSnapshot.put("2", "B");
1104 leadersSnapshot.put("3", "C");
1106 ByteString bs = toByteString(leadersSnapshot);
1107 byte[] barray = bs.toByteArray();
1109 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1110 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1112 assertEquals(bs.size(), barray.length);
1115 for (int i=0; i < barray.length; i = i + 50) {
1119 if (i + 50 > barray.length) {
1123 ByteString chunk = fts.getNextChunk();
1124 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1125 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1127 fts.markSendStatus(true);
1128 if (!fts.isLastChunk(chunkIndex)) {
1129 fts.incrementChunkIndex();
1133 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1136 @Override protected RaftActorBehavior createBehavior(
1137 RaftActorContext actorContext) {
1138 return new Leader(actorContext);
1142 protected MockRaftActorContext createActorContext() {
1143 return createActorContext(leaderActor);
1147 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1148 return createActorContext(LEADER_ID, actorRef);
1151 private MockRaftActorContext createActorContextWithFollower() {
1152 MockRaftActorContext actorContext = createActorContext();
1153 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1154 followerActor.path().toString()).build());
1155 return actorContext;
1158 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1159 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1160 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1161 configParams.setElectionTimeoutFactor(100000);
1162 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1163 context.setConfigParams(configParams);
1164 context.setPayloadVersion(payloadVersion);
1168 private MockRaftActorContext createFollowerActorContextWithLeader() {
1169 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1170 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1171 followerConfig.setElectionTimeoutFactor(10000);
1172 followerActorContext.setConfigParams(followerConfig);
1173 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1174 return followerActorContext;
1178 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1179 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1181 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1183 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1185 Follower follower = new Follower(followerActorContext);
1186 followerActor.underlyingActor().setBehavior(follower);
1188 Map<String, String> peerAddresses = new HashMap<>();
1189 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1191 leaderActorContext.setPeerAddresses(peerAddresses);
1193 leaderActorContext.getReplicatedLog().removeFrom(0);
1196 leaderActorContext.setReplicatedLog(
1197 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1199 leaderActorContext.setCommitIndex(1);
1201 followerActorContext.getReplicatedLog().removeFrom(0);
1203 // follower too has the exact same log entries and has the same commit index
1204 followerActorContext.setReplicatedLog(
1205 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1207 followerActorContext.setCommitIndex(1);
1209 leader = new Leader(leaderActorContext);
1211 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1213 assertEquals(1, appendEntries.getLeaderCommit());
1214 assertEquals(0, appendEntries.getEntries().size());
1215 assertEquals(0, appendEntries.getPrevLogIndex());
1217 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1218 leaderActor, AppendEntriesReply.class);
1220 assertEquals(2, appendEntriesReply.getLogLastIndex());
1221 assertEquals(1, appendEntriesReply.getLogLastTerm());
1223 // follower returns its next index
1224 assertEquals(2, appendEntriesReply.getLogLastIndex());
1225 assertEquals(1, appendEntriesReply.getLogLastTerm());
1231 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1232 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1234 MockRaftActorContext leaderActorContext = createActorContext();
1236 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1237 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1239 Follower follower = new Follower(followerActorContext);
1240 followerActor.underlyingActor().setBehavior(follower);
1242 Map<String, String> leaderPeerAddresses = new HashMap<>();
1243 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1245 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1247 leaderActorContext.getReplicatedLog().removeFrom(0);
1249 leaderActorContext.setReplicatedLog(
1250 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1252 leaderActorContext.setCommitIndex(1);
1254 followerActorContext.getReplicatedLog().removeFrom(0);
1256 followerActorContext.setReplicatedLog(
1257 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1259 // follower has the same log entries but its commit index > leaders commit index
1260 followerActorContext.setCommitIndex(2);
1262 leader = new Leader(leaderActorContext);
1264 // Initial heartbeat
1265 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1267 assertEquals(1, appendEntries.getLeaderCommit());
1268 assertEquals(0, appendEntries.getEntries().size());
1269 assertEquals(0, appendEntries.getPrevLogIndex());
1271 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1272 leaderActor, AppendEntriesReply.class);
1274 assertEquals(2, appendEntriesReply.getLogLastIndex());
1275 assertEquals(1, appendEntriesReply.getLogLastTerm());
1277 leaderActor.underlyingActor().setBehavior(follower);
1278 leader.handleMessage(followerActor, appendEntriesReply);
1280 leaderActor.underlyingActor().clear();
1281 followerActor.underlyingActor().clear();
1283 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1284 TimeUnit.MILLISECONDS);
1286 leader.handleMessage(leaderActor, new SendHeartBeat());
1288 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1290 assertEquals(2, appendEntries.getLeaderCommit());
1291 assertEquals(0, appendEntries.getEntries().size());
1292 assertEquals(2, appendEntries.getPrevLogIndex());
1294 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1296 assertEquals(2, appendEntriesReply.getLogLastIndex());
1297 assertEquals(1, appendEntriesReply.getLogLastTerm());
1299 assertEquals(2, followerActorContext.getCommitIndex());
1305 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1306 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1308 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1309 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1310 new FiniteDuration(1000, TimeUnit.SECONDS));
1312 leaderActorContext.setReplicatedLog(
1313 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1314 long leaderCommitIndex = 2;
1315 leaderActorContext.setCommitIndex(leaderCommitIndex);
1316 leaderActorContext.setLastApplied(leaderCommitIndex);
1318 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1319 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1321 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1323 followerActorContext.setReplicatedLog(
1324 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1325 followerActorContext.setCommitIndex(0);
1326 followerActorContext.setLastApplied(0);
1328 Follower follower = new Follower(followerActorContext);
1329 followerActor.underlyingActor().setBehavior(follower);
1331 leader = new Leader(leaderActorContext);
1333 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1336 MessageCollectorActor.clearMessages(followerActor);
1337 MessageCollectorActor.clearMessages(leaderActor);
1339 // Verify initial AppendEntries sent with the leader's current commit index.
1340 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1341 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1342 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1344 leaderActor.underlyingActor().setBehavior(leader);
1346 leader.handleMessage(followerActor, appendEntriesReply);
1348 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1349 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1351 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1352 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1353 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1355 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1356 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1357 appendEntries.getEntries().get(0).getData());
1358 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1359 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1360 appendEntries.getEntries().get(1).getData());
1362 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1363 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1365 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1367 ApplyState applyState = applyStateList.get(0);
1368 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1369 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1370 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1371 applyState.getReplicatedLogEntry().getData());
1373 applyState = applyStateList.get(1);
1374 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1375 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1376 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1377 applyState.getReplicatedLogEntry().getData());
1379 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1380 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1384 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1385 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1387 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1388 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1389 new FiniteDuration(1000, TimeUnit.SECONDS));
1391 leaderActorContext.setReplicatedLog(
1392 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1393 long leaderCommitIndex = 1;
1394 leaderActorContext.setCommitIndex(leaderCommitIndex);
1395 leaderActorContext.setLastApplied(leaderCommitIndex);
1397 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1398 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1400 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1402 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1403 followerActorContext.setCommitIndex(-1);
1404 followerActorContext.setLastApplied(-1);
1406 Follower follower = new Follower(followerActorContext);
1407 followerActor.underlyingActor().setBehavior(follower);
1409 leader = new Leader(leaderActorContext);
1411 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1412 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1414 MessageCollectorActor.clearMessages(followerActor);
1415 MessageCollectorActor.clearMessages(leaderActor);
1417 // Verify initial AppendEntries sent with the leader's current commit index.
1418 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1420 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1422 leaderActor.underlyingActor().setBehavior(leader);
1424 leader.handleMessage(followerActor, appendEntriesReply);
1426 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1427 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1429 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1430 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1431 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1433 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1434 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1435 appendEntries.getEntries().get(0).getData());
1436 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1437 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1438 appendEntries.getEntries().get(1).getData());
1440 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1441 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1443 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1445 ApplyState applyState = applyStateList.get(0);
1446 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1447 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1448 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1449 applyState.getReplicatedLogEntry().getData());
1451 applyState = applyStateList.get(1);
1452 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1453 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1454 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1455 applyState.getReplicatedLogEntry().getData());
1457 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1458 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1462 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1463 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1465 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1466 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1467 new FiniteDuration(1000, TimeUnit.SECONDS));
1469 leaderActorContext.setReplicatedLog(
1470 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1471 long leaderCommitIndex = 1;
1472 leaderActorContext.setCommitIndex(leaderCommitIndex);
1473 leaderActorContext.setLastApplied(leaderCommitIndex);
1475 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1476 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1478 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1480 followerActorContext.setReplicatedLog(
1481 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1482 followerActorContext.setCommitIndex(-1);
1483 followerActorContext.setLastApplied(-1);
1485 Follower follower = new Follower(followerActorContext);
1486 followerActor.underlyingActor().setBehavior(follower);
1488 leader = new Leader(leaderActorContext);
1490 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1491 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1493 MessageCollectorActor.clearMessages(followerActor);
1494 MessageCollectorActor.clearMessages(leaderActor);
1496 // Verify initial AppendEntries sent with the leader's current commit index.
1497 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1498 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1499 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1501 leaderActor.underlyingActor().setBehavior(leader);
1503 leader.handleMessage(followerActor, appendEntriesReply);
1505 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1506 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1508 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1509 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1510 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1512 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1513 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1514 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1515 appendEntries.getEntries().get(0).getData());
1516 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1517 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1518 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1519 appendEntries.getEntries().get(1).getData());
1521 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1522 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1524 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1526 ApplyState applyState = applyStateList.get(0);
1527 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1528 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1529 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1530 applyState.getReplicatedLogEntry().getData());
1532 applyState = applyStateList.get(1);
1533 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1534 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1535 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1536 applyState.getReplicatedLogEntry().getData());
1538 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1539 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1540 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1544 public void testHandleAppendEntriesReplyWithNewerTerm(){
1545 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1547 MockRaftActorContext leaderActorContext = createActorContext();
1548 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1549 new FiniteDuration(10000, TimeUnit.SECONDS));
1551 leaderActorContext.setReplicatedLog(
1552 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1554 leader = new Leader(leaderActorContext);
1555 leaderActor.underlyingActor().setBehavior(leader);
1556 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1558 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1560 assertEquals(false, appendEntriesReply.isSuccess());
1561 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1563 MessageCollectorActor.clearMessages(leaderActor);
1567 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1568 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1570 MockRaftActorContext leaderActorContext = createActorContext();
1571 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1572 new FiniteDuration(10000, TimeUnit.SECONDS));
1574 leaderActorContext.setReplicatedLog(
1575 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1576 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1578 leader = new Leader(leaderActorContext);
1579 leaderActor.underlyingActor().setBehavior(leader);
1580 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1582 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1584 assertEquals(false, appendEntriesReply.isSuccess());
1585 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1587 MessageCollectorActor.clearMessages(leaderActor);
1591 public void testHandleAppendEntriesReplySuccess() throws Exception {
1592 logStart("testHandleAppendEntriesReplySuccess");
1594 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1596 leaderActorContext.setReplicatedLog(
1597 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1599 leaderActorContext.setCommitIndex(1);
1600 leaderActorContext.setLastApplied(1);
1601 leaderActorContext.getTermInformation().update(1, "leader");
1603 leader = new Leader(leaderActorContext);
1605 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1607 short payloadVersion = 5;
1608 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1610 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1612 assertEquals(RaftState.Leader, raftActorBehavior.state());
1614 assertEquals(2, leaderActorContext.getCommitIndex());
1616 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1617 leaderActor, ApplyJournalEntries.class);
1619 assertEquals(2, leaderActorContext.getLastApplied());
1621 assertEquals(2, applyJournalEntries.getToIndex());
1623 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1626 assertEquals(1,applyStateList.size());
1628 ApplyState applyState = applyStateList.get(0);
1630 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1632 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1633 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1637 public void testHandleAppendEntriesReplyUnknownFollower(){
1638 logStart("testHandleAppendEntriesReplyUnknownFollower");
1640 MockRaftActorContext leaderActorContext = createActorContext();
1642 leader = new Leader(leaderActorContext);
1644 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1646 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1648 assertEquals(RaftState.Leader, raftActorBehavior.state());
1652 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1653 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1655 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1656 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1657 new FiniteDuration(1000, TimeUnit.SECONDS));
1658 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1660 leaderActorContext.setReplicatedLog(
1661 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1662 long leaderCommitIndex = 3;
1663 leaderActorContext.setCommitIndex(leaderCommitIndex);
1664 leaderActorContext.setLastApplied(leaderCommitIndex);
1666 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1667 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1668 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1669 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1671 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1673 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1674 followerActorContext.setCommitIndex(-1);
1675 followerActorContext.setLastApplied(-1);
1677 Follower follower = new Follower(followerActorContext);
1678 followerActor.underlyingActor().setBehavior(follower);
1680 leader = new Leader(leaderActorContext);
1682 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1683 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1685 MessageCollectorActor.clearMessages(followerActor);
1686 MessageCollectorActor.clearMessages(leaderActor);
1688 // Verify initial AppendEntries sent with the leader's current commit index.
1689 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1690 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1691 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1693 leaderActor.underlyingActor().setBehavior(leader);
1695 leader.handleMessage(followerActor, appendEntriesReply);
1697 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1698 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1700 appendEntries = appendEntriesList.get(0);
1701 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1702 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1703 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1705 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1706 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1707 appendEntries.getEntries().get(0).getData());
1708 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1709 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1710 appendEntries.getEntries().get(1).getData());
1712 appendEntries = appendEntriesList.get(1);
1713 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1714 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1715 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1717 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1718 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1719 appendEntries.getEntries().get(0).getData());
1720 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1721 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1722 appendEntries.getEntries().get(1).getData());
1724 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1725 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1727 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1729 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1730 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1734 public void testHandleRequestVoteReply(){
1735 logStart("testHandleRequestVoteReply");
1737 MockRaftActorContext leaderActorContext = createActorContext();
1739 leader = new Leader(leaderActorContext);
1741 // Should be a no-op.
1742 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1743 new RequestVoteReply(1, true));
1745 assertEquals(RaftState.Leader, raftActorBehavior.state());
1747 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1749 assertEquals(RaftState.Leader, raftActorBehavior.state());
1753 public void testIsolatedLeaderCheckNoFollowers() {
1754 logStart("testIsolatedLeaderCheckNoFollowers");
1756 MockRaftActorContext leaderActorContext = createActorContext();
1758 leader = new Leader(leaderActorContext);
1759 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1760 Assert.assertTrue(behavior instanceof Leader);
1763 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1764 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1765 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1767 MockRaftActorContext leaderActorContext = createActorContext();
1769 Map<String, String> peerAddresses = new HashMap<>();
1770 peerAddresses.put("follower-1", followerActor1.path().toString());
1771 peerAddresses.put("follower-2", followerActor2.path().toString());
1773 leaderActorContext.setPeerAddresses(peerAddresses);
1774 leaderActorContext.setRaftPolicy(raftPolicy);
1776 leader = new Leader(leaderActorContext);
1778 leader.markFollowerActive("follower-1");
1779 leader.markFollowerActive("follower-2");
1780 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1781 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1782 behavior instanceof Leader);
1784 // kill 1 follower and verify if that got killed
1785 final JavaTestKit probe = new JavaTestKit(getSystem());
1786 probe.watch(followerActor1);
1787 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1788 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1789 assertEquals(termMsg1.getActor(), followerActor1);
1791 leader.markFollowerInActive("follower-1");
1792 leader.markFollowerActive("follower-2");
1793 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1794 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1795 behavior instanceof Leader);
1797 // kill 2nd follower and leader should change to Isolated leader
1798 followerActor2.tell(PoisonPill.getInstance(), null);
1799 probe.watch(followerActor2);
1800 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1801 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1802 assertEquals(termMsg2.getActor(), followerActor2);
1804 leader.markFollowerInActive("follower-2");
1805 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1809 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1810 logStart("testIsolatedLeaderCheckTwoFollowers");
1812 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1814 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1815 behavior instanceof IsolatedLeader);
1819 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1820 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1822 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1824 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1825 behavior instanceof Leader);
1829 public void testLaggingFollowerStarvation() throws Exception {
1830 logStart("testLaggingFollowerStarvation");
1831 new JavaTestKit(getSystem()) {{
1832 String leaderActorId = actorFactory.generateActorId("leader");
1833 String follower1ActorId = actorFactory.generateActorId("follower");
1834 String follower2ActorId = actorFactory.generateActorId("follower");
1836 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1837 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1838 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1839 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1841 MockRaftActorContext leaderActorContext =
1842 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1844 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1845 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1846 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1848 leaderActorContext.setConfigParams(configParams);
1850 leaderActorContext.setReplicatedLog(
1851 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1853 Map<String, String> peerAddresses = new HashMap<>();
1854 peerAddresses.put(follower1ActorId,
1855 follower1Actor.path().toString());
1856 peerAddresses.put(follower2ActorId,
1857 follower2Actor.path().toString());
1859 leaderActorContext.setPeerAddresses(peerAddresses);
1860 leaderActorContext.getTermInformation().update(1, leaderActorId);
1862 RaftActorBehavior leader = createBehavior(leaderActorContext);
1864 leaderActor.underlyingActor().setBehavior(leader);
1866 for(int i=1;i<6;i++) {
1867 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1868 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1869 assertTrue(newBehavior == leader);
1870 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1873 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1874 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1876 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1877 heartbeats.size() > 1);
1879 // Check if follower-2 got AppendEntries during this time and was not starved
1880 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1882 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1883 appendEntries.size() > 1);
1889 public void testReplicationConsensusWithNonVotingFollower() {
1890 logStart("testReplicationConsensusWithNonVotingFollower");
1892 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1893 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1894 new FiniteDuration(1000, TimeUnit.SECONDS));
1896 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1898 String nonVotingFollowerId = "nonvoting-follower";
1899 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1900 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1902 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1904 leader = new Leader(leaderActorContext);
1906 // Ignore initial heartbeats
1907 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1908 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1910 MessageCollectorActor.clearMessages(followerActor);
1911 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1912 MessageCollectorActor.clearMessages(leaderActor);
1914 // Send a Replicate message and wait for AppendEntries.
1915 sendReplicate(leaderActorContext, 0);
1917 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1918 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1920 // Send reply only from the voting follower and verify consensus via ApplyState.
1921 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1923 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1925 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1927 MessageCollectorActor.clearMessages(followerActor);
1928 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1929 MessageCollectorActor.clearMessages(leaderActor);
1931 // Send another Replicate message
1932 sendReplicate(leaderActorContext, 1);
1934 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1935 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1936 AppendEntries.class);
1937 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1938 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1940 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1941 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1943 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1945 // Send reply from the voting follower and verify consensus.
1946 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1948 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1952 public void testTransferLeadershipWithFollowerInSync() {
1953 logStart("testTransferLeadershipWithFollowerInSync");
1955 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1956 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1957 new FiniteDuration(1000, TimeUnit.SECONDS));
1958 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1960 leader = new Leader(leaderActorContext);
1962 // Initial heartbeat
1963 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1964 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
1965 MessageCollectorActor.clearMessages(followerActor);
1967 sendReplicate(leaderActorContext, 0);
1968 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1970 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1971 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1972 MessageCollectorActor.clearMessages(followerActor);
1974 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
1975 leader.transferLeadership(mockTransferCohort);
1977 verify(mockTransferCohort, never()).transferComplete();
1978 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1979 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1981 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
1982 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1984 // Leader should force an election timeout
1985 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
1987 verify(mockTransferCohort).transferComplete();
1991 public void testTransferLeadershipWithEmptyLog() {
1992 logStart("testTransferLeadershipWithEmptyLog");
1994 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1995 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1996 new FiniteDuration(1000, TimeUnit.SECONDS));
1997 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1999 leader = new Leader(leaderActorContext);
2001 // Initial heartbeat
2002 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2003 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2004 MessageCollectorActor.clearMessages(followerActor);
2006 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2007 leader.transferLeadership(mockTransferCohort);
2009 verify(mockTransferCohort, never()).transferComplete();
2010 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2011 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2013 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2014 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2016 // Leader should force an election timeout
2017 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2019 verify(mockTransferCohort).transferComplete();
2023 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2024 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2026 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2027 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2028 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2030 leader = new Leader(leaderActorContext);
2032 // Initial heartbeat
2033 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2034 MessageCollectorActor.clearMessages(followerActor);
2036 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2037 leader.transferLeadership(mockTransferCohort);
2039 verify(mockTransferCohort, never()).transferComplete();
2041 // Sync up the follower.
2042 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2043 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2044 MessageCollectorActor.clearMessages(followerActor);
2046 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2047 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2048 leader.handleMessage(leaderActor, new SendHeartBeat());
2049 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2050 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2052 // Leader should force an election timeout
2053 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2055 verify(mockTransferCohort).transferComplete();
2059 public void testTransferLeadershipWithFollowerSyncTimeout() {
2060 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2062 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2063 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2064 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2065 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2066 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2068 leader = new Leader(leaderActorContext);
2070 // Initial heartbeat
2071 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2072 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2073 MessageCollectorActor.clearMessages(followerActor);
2075 sendReplicate(leaderActorContext, 0);
2076 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2078 MessageCollectorActor.clearMessages(followerActor);
2080 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2081 leader.transferLeadership(mockTransferCohort);
2083 verify(mockTransferCohort, never()).transferComplete();
2085 // Send heartbeats to time out the transfer.
2086 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2087 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2088 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2089 leader.handleMessage(leaderActor, new SendHeartBeat());
2092 verify(mockTransferCohort).abortTransfer();
2093 verify(mockTransferCohort, never()).transferComplete();
2094 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2098 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2099 ActorRef actorRef, RaftRPC rpc) throws Exception {
2100 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2101 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2104 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2106 private final long electionTimeOutIntervalMillis;
2107 private final int snapshotChunkSize;
2109 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2111 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2112 this.snapshotChunkSize = snapshotChunkSize;
2116 public FiniteDuration getElectionTimeOutInterval() {
2117 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2121 public int getSnapshotChunkSize() {
2122 return snapshotChunkSize;