1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractLeaderTest {
51 static final String FOLLOWER_ID = "follower";
53 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59 private Leader leader;
63 public void tearDown() throws Exception {
72 public void testHandleMessageForUnknownMessage() throws Exception {
73 logStart("testHandleMessageForUnknownMessage");
75 leader = new Leader(createActorContext());
77 // handle message should return the Leader state when it receives an
79 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80 Assert.assertTrue(behavior instanceof Leader);
84 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
87 MockRaftActorContext actorContext = createActorContextWithFollower();
90 actorContext.getTermInformation().update(term, "");
92 leader = new Leader(actorContext);
94 // Leader should send an immediate heartbeat with no entries as follower is inactive.
95 long lastIndex = actorContext.getReplicatedLog().lastIndex();
96 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97 assertEquals("getTerm", term, appendEntries.getTerm());
98 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100 assertEquals("Entries size", 0, appendEntries.getEntries().size());
102 // The follower would normally reply - simulate that explicitly here.
103 leader.handleMessage(followerActor, new AppendEntriesReply(
104 FOLLOWER_ID, term, true, lastIndex - 1, term));
105 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
107 followerActor.underlyingActor().clear();
109 // Sleep for the heartbeat interval so AppendEntries is sent.
110 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
113 leader.handleMessage(leaderActor, new SendHeartBeat());
115 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118 assertEquals("Entries size", 1, appendEntries.getEntries().size());
119 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
124 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
125 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
127 MockRaftActorContext actorContext = createActorContextWithFollower();
130 actorContext.getTermInformation().update(term, "");
132 leader = new Leader(actorContext);
134 // Leader will send an immediate heartbeat - ignore it.
135 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
137 // The follower would normally reply - simulate that explicitly here.
138 long lastIndex = actorContext.getReplicatedLog().lastIndex();
139 leader.handleMessage(followerActor, new AppendEntriesReply(
140 FOLLOWER_ID, term, true, lastIndex, term));
141 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
143 followerActor.underlyingActor().clear();
145 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
146 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
147 1, lastIndex + 1, payload);
148 actorContext.getReplicatedLog().append(newEntry);
149 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
150 new Replicate(null, null, newEntry));
152 // State should not change
153 assertTrue(raftBehavior instanceof Leader);
155 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
156 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
157 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
158 assertEquals("Entries size", 1, appendEntries.getEntries().size());
159 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
160 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
161 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
165 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
166 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
168 MockRaftActorContext actorContext = createActorContext();
170 leader = new Leader(actorContext);
172 actorContext.setLastApplied(0);
174 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
175 long term = actorContext.getTermInformation().getCurrentTerm();
176 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
177 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
179 actorContext.getReplicatedLog().append(newEntry);
181 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
182 new Replicate(leaderActor, "state-id", newEntry));
184 // State should not change
185 assertTrue(raftBehavior instanceof Leader);
187 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
189 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
190 // one since lastApplied state is 0.
191 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
192 leaderActor, ApplyState.class);
193 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
195 for(int i = 0; i <= newLogIndex - 1; i++ ) {
196 ApplyState applyState = applyStateList.get(i);
197 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
198 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
201 ApplyState last = applyStateList.get((int) newLogIndex - 1);
202 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
203 assertEquals("getIdentifier", "state-id", last.getIdentifier());
207 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
208 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
210 MockRaftActorContext actorContext = createActorContextWithFollower();
212 Map<String, String> leadersSnapshot = new HashMap<>();
213 leadersSnapshot.put("1", "A");
214 leadersSnapshot.put("2", "B");
215 leadersSnapshot.put("3", "C");
218 actorContext.getReplicatedLog().removeFrom(0);
220 final int followersLastIndex = 2;
221 final int snapshotIndex = 3;
222 final int newEntryIndex = 4;
223 final int snapshotTerm = 1;
224 final int currentTerm = 2;
226 // set the snapshot variables in replicatedlog
227 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
228 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
229 actorContext.setCommitIndex(followersLastIndex);
230 //set follower timeout to 2 mins, helps during debugging
231 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
233 leader = new Leader(actorContext);
236 ReplicatedLogImplEntry entry =
237 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
238 new MockRaftActorContext.MockPayload("D"));
240 //update follower timestamp
241 leader.markFollowerActive(FOLLOWER_ID);
243 ByteString bs = toByteString(leadersSnapshot);
244 leader.setSnapshot(Optional.of(bs));
245 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
246 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
248 //send first chunk and no InstallSnapshotReply received yet
250 fts.incrementChunkIndex();
252 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
253 TimeUnit.MILLISECONDS);
255 leader.handleMessage(leaderActor, new SendHeartBeat());
257 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
259 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
261 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
263 //InstallSnapshotReply received
264 fts.markSendStatus(true);
266 leader.handleMessage(leaderActor, new SendHeartBeat());
268 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
269 InstallSnapshot.SERIALIZABLE_CLASS);
271 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
273 assertEquals(snapshotIndex, is.getLastIncludedIndex());
277 public void testSendAppendEntriesSnapshotScenario() throws Exception {
278 logStart("testSendAppendEntriesSnapshotScenario");
280 MockRaftActorContext actorContext = createActorContextWithFollower();
282 Map<String, String> leadersSnapshot = new HashMap<>();
283 leadersSnapshot.put("1", "A");
284 leadersSnapshot.put("2", "B");
285 leadersSnapshot.put("3", "C");
288 actorContext.getReplicatedLog().removeFrom(0);
290 final int followersLastIndex = 2;
291 final int snapshotIndex = 3;
292 final int newEntryIndex = 4;
293 final int snapshotTerm = 1;
294 final int currentTerm = 2;
296 // set the snapshot variables in replicatedlog
297 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
298 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
299 actorContext.setCommitIndex(followersLastIndex);
301 leader = new Leader(actorContext);
303 // Leader will send an immediate heartbeat - ignore it.
304 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
307 ReplicatedLogImplEntry entry =
308 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
309 new MockRaftActorContext.MockPayload("D"));
311 //update follower timestamp
312 leader.markFollowerActive(FOLLOWER_ID);
314 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
315 RaftActorBehavior raftBehavior = leader.handleMessage(
316 leaderActor, new Replicate(null, "state-id", entry));
318 assertTrue(raftBehavior instanceof Leader);
320 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
324 public void testInitiateInstallSnapshot() throws Exception {
325 logStart("testInitiateInstallSnapshot");
327 MockRaftActorContext actorContext = createActorContextWithFollower();
329 Map<String, String> leadersSnapshot = new HashMap<>();
330 leadersSnapshot.put("1", "A");
331 leadersSnapshot.put("2", "B");
332 leadersSnapshot.put("3", "C");
335 actorContext.getReplicatedLog().removeFrom(0);
337 final int followersLastIndex = 2;
338 final int snapshotIndex = 3;
339 final int newEntryIndex = 4;
340 final int snapshotTerm = 1;
341 final int currentTerm = 2;
343 // set the snapshot variables in replicatedlog
344 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
345 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
346 actorContext.setLastApplied(3);
347 actorContext.setCommitIndex(followersLastIndex);
349 leader = new Leader(actorContext);
351 // Leader will send an immediate heartbeat - ignore it.
352 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
354 // set the snapshot as absent and check if capture-snapshot is invoked.
355 leader.setSnapshot(Optional.<ByteString>absent());
358 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
359 new MockRaftActorContext.MockPayload("D"));
361 actorContext.getReplicatedLog().append(entry);
363 //update follower timestamp
364 leader.markFollowerActive(FOLLOWER_ID);
366 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
368 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
370 assertTrue(cs.isInstallSnapshotInitiated());
371 assertEquals(3, cs.getLastAppliedIndex());
372 assertEquals(1, cs.getLastAppliedTerm());
373 assertEquals(4, cs.getLastIndex());
374 assertEquals(2, cs.getLastTerm());
376 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
377 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
379 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
380 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
384 public void testInstallSnapshot() throws Exception {
385 logStart("testInstallSnapshot");
387 MockRaftActorContext actorContext = createActorContextWithFollower();
389 Map<String, String> leadersSnapshot = new HashMap<>();
390 leadersSnapshot.put("1", "A");
391 leadersSnapshot.put("2", "B");
392 leadersSnapshot.put("3", "C");
395 actorContext.getReplicatedLog().removeFrom(0);
397 final int followersLastIndex = 2;
398 final int snapshotIndex = 3;
399 final int snapshotTerm = 1;
400 final int currentTerm = 2;
402 // set the snapshot variables in replicatedlog
403 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
404 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
405 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
406 actorContext.setCommitIndex(followersLastIndex);
408 leader = new Leader(actorContext);
410 // Ignore initial heartbeat.
411 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
413 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
414 new SendInstallSnapshot(toByteString(leadersSnapshot)));
416 assertTrue(raftBehavior instanceof Leader);
418 // check if installsnapshot gets called with the correct values.
420 InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
421 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
423 assertNotNull(installSnapshot.getData());
424 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
425 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
427 assertEquals(currentTerm, installSnapshot.getTerm());
431 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
432 logStart("testHandleInstallSnapshotReplyLastChunk");
434 MockRaftActorContext actorContext = createActorContextWithFollower();
436 final int followersLastIndex = 2;
437 final int snapshotIndex = 3;
438 final int snapshotTerm = 1;
439 final int currentTerm = 2;
441 actorContext.setCommitIndex(followersLastIndex);
443 leader = new Leader(actorContext);
445 // Ignore initial heartbeat.
446 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
448 Map<String, String> leadersSnapshot = new HashMap<>();
449 leadersSnapshot.put("1", "A");
450 leadersSnapshot.put("2", "B");
451 leadersSnapshot.put("3", "C");
453 // set the snapshot variables in replicatedlog
455 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
456 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
457 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
459 ByteString bs = toByteString(leadersSnapshot);
460 leader.setSnapshot(Optional.of(bs));
461 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
462 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
463 while(!fts.isLastChunk(fts.getChunkIndex())) {
465 fts.incrementChunkIndex();
469 actorContext.getReplicatedLog().removeFrom(0);
471 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
472 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
474 assertTrue(raftBehavior instanceof Leader);
476 assertEquals(0, leader.followerSnapshotSize());
477 assertEquals(1, leader.followerLogSize());
478 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
480 assertEquals(snapshotIndex, fli.getMatchIndex());
481 assertEquals(snapshotIndex, fli.getMatchIndex());
482 assertEquals(snapshotIndex + 1, fli.getNextIndex());
486 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
487 logStart("testSendSnapshotfromInstallSnapshotReply");
489 MockRaftActorContext actorContext = createActorContextWithFollower();
491 final int followersLastIndex = 2;
492 final int snapshotIndex = 3;
493 final int snapshotTerm = 1;
494 final int currentTerm = 2;
496 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
498 public int getSnapshotChunkSize() {
502 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
503 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
505 actorContext.setConfigParams(configParams);
506 actorContext.setCommitIndex(followersLastIndex);
508 leader = new Leader(actorContext);
510 Map<String, String> leadersSnapshot = new HashMap<>();
511 leadersSnapshot.put("1", "A");
512 leadersSnapshot.put("2", "B");
513 leadersSnapshot.put("3", "C");
515 // set the snapshot variables in replicatedlog
516 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
517 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
518 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
520 ByteString bs = toByteString(leadersSnapshot);
521 leader.setSnapshot(Optional.of(bs));
523 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
525 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
526 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
528 assertEquals(1, installSnapshot.getChunkIndex());
529 assertEquals(3, installSnapshot.getTotalChunks());
531 followerActor.underlyingActor().clear();
532 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
533 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
535 installSnapshot = MessageCollectorActor.expectFirstMatching(
536 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
538 assertEquals(2, installSnapshot.getChunkIndex());
539 assertEquals(3, installSnapshot.getTotalChunks());
541 followerActor.underlyingActor().clear();
542 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
543 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
545 installSnapshot = MessageCollectorActor.expectFirstMatching(
546 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
548 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
549 followerActor.underlyingActor().clear();
550 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
551 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
553 installSnapshot = MessageCollectorActor.getFirstMatching(
554 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
556 Assert.assertNull(installSnapshot);
561 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
562 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
564 MockRaftActorContext actorContext = createActorContextWithFollower();
566 final int followersLastIndex = 2;
567 final int snapshotIndex = 3;
568 final int snapshotTerm = 1;
569 final int currentTerm = 2;
571 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
573 public int getSnapshotChunkSize() {
578 actorContext.setCommitIndex(followersLastIndex);
580 leader = new Leader(actorContext);
582 Map<String, String> leadersSnapshot = new HashMap<>();
583 leadersSnapshot.put("1", "A");
584 leadersSnapshot.put("2", "B");
585 leadersSnapshot.put("3", "C");
587 // set the snapshot variables in replicatedlog
588 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
589 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
590 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
592 ByteString bs = toByteString(leadersSnapshot);
593 leader.setSnapshot(Optional.of(bs));
595 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
597 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
598 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
600 assertEquals(1, installSnapshot.getChunkIndex());
601 assertEquals(3, installSnapshot.getTotalChunks());
603 followerActor.underlyingActor().clear();
605 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
606 FOLLOWER_ID, -1, false));
608 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
609 TimeUnit.MILLISECONDS);
611 leader.handleMessage(leaderActor, new SendHeartBeat());
613 installSnapshot = MessageCollectorActor.expectFirstMatching(
614 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
616 assertEquals(1, installSnapshot.getChunkIndex());
617 assertEquals(3, installSnapshot.getTotalChunks());
621 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
622 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
624 MockRaftActorContext actorContext = createActorContextWithFollower();
626 final int followersLastIndex = 2;
627 final int snapshotIndex = 3;
628 final int snapshotTerm = 1;
629 final int currentTerm = 2;
631 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
633 public int getSnapshotChunkSize() {
638 actorContext.setCommitIndex(followersLastIndex);
640 leader = new Leader(actorContext);
642 Map<String, String> leadersSnapshot = new HashMap<>();
643 leadersSnapshot.put("1", "A");
644 leadersSnapshot.put("2", "B");
645 leadersSnapshot.put("3", "C");
647 // set the snapshot variables in replicatedlog
648 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
649 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
650 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
652 ByteString bs = toByteString(leadersSnapshot);
653 leader.setSnapshot(Optional.of(bs));
655 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
657 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
658 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
660 assertEquals(1, installSnapshot.getChunkIndex());
661 assertEquals(3, installSnapshot.getTotalChunks());
662 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
664 int hashCode = installSnapshot.getData().hashCode();
666 followerActor.underlyingActor().clear();
668 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
669 FOLLOWER_ID, 1, true));
671 installSnapshot = MessageCollectorActor.expectFirstMatching(
672 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
674 assertEquals(2, installSnapshot.getChunkIndex());
675 assertEquals(3, installSnapshot.getTotalChunks());
676 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
680 public void testFollowerToSnapshotLogic() {
681 logStart("testFollowerToSnapshotLogic");
683 MockRaftActorContext actorContext = createActorContext();
685 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
687 public int getSnapshotChunkSize() {
692 leader = new Leader(actorContext);
694 Map<String, String> leadersSnapshot = new HashMap<>();
695 leadersSnapshot.put("1", "A");
696 leadersSnapshot.put("2", "B");
697 leadersSnapshot.put("3", "C");
699 ByteString bs = toByteString(leadersSnapshot);
700 byte[] barray = bs.toByteArray();
702 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
703 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
705 assertEquals(bs.size(), barray.length);
708 for (int i=0; i < barray.length; i = i + 50) {
712 if (i + 50 > barray.length) {
716 ByteString chunk = fts.getNextChunk();
717 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
718 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
720 fts.markSendStatus(true);
721 if (!fts.isLastChunk(chunkIndex)) {
722 fts.incrementChunkIndex();
726 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
729 @Override protected RaftActorBehavior createBehavior(
730 RaftActorContext actorContext) {
731 return new Leader(actorContext);
735 protected MockRaftActorContext createActorContext() {
736 return createActorContext(leaderActor);
740 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
741 return createActorContext("leader", actorRef);
744 private MockRaftActorContext createActorContextWithFollower() {
745 MockRaftActorContext actorContext = createActorContext();
746 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
747 followerActor.path().toString()).build());
751 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
752 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
753 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
754 configParams.setElectionTimeoutFactor(100000);
755 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
756 context.setConfigParams(configParams);
761 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
762 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
764 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
766 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
768 Follower follower = new Follower(followerActorContext);
769 followerActor.underlyingActor().setBehavior(follower);
771 Map<String, String> peerAddresses = new HashMap<>();
772 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
774 leaderActorContext.setPeerAddresses(peerAddresses);
776 leaderActorContext.getReplicatedLog().removeFrom(0);
779 leaderActorContext.setReplicatedLog(
780 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
782 leaderActorContext.setCommitIndex(1);
784 followerActorContext.getReplicatedLog().removeFrom(0);
786 // follower too has the exact same log entries and has the same commit index
787 followerActorContext.setReplicatedLog(
788 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
790 followerActorContext.setCommitIndex(1);
792 leader = new Leader(leaderActorContext);
794 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
796 assertEquals(1, appendEntries.getLeaderCommit());
797 assertEquals(0, appendEntries.getEntries().size());
798 assertEquals(0, appendEntries.getPrevLogIndex());
800 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
801 leaderActor, AppendEntriesReply.class);
803 assertEquals(2, appendEntriesReply.getLogLastIndex());
804 assertEquals(1, appendEntriesReply.getLogLastTerm());
806 // follower returns its next index
807 assertEquals(2, appendEntriesReply.getLogLastIndex());
808 assertEquals(1, appendEntriesReply.getLogLastTerm());
814 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
815 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
817 MockRaftActorContext leaderActorContext = createActorContext();
819 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
821 Follower follower = new Follower(followerActorContext);
822 followerActor.underlyingActor().setBehavior(follower);
824 Map<String, String> peerAddresses = new HashMap<>();
825 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
827 leaderActorContext.setPeerAddresses(peerAddresses);
829 leaderActorContext.getReplicatedLog().removeFrom(0);
831 leaderActorContext.setReplicatedLog(
832 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
834 leaderActorContext.setCommitIndex(1);
836 followerActorContext.getReplicatedLog().removeFrom(0);
838 followerActorContext.setReplicatedLog(
839 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
841 // follower has the same log entries but its commit index > leaders commit index
842 followerActorContext.setCommitIndex(2);
844 leader = new Leader(leaderActorContext);
847 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
849 assertEquals(1, appendEntries.getLeaderCommit());
850 assertEquals(0, appendEntries.getEntries().size());
851 assertEquals(0, appendEntries.getPrevLogIndex());
853 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
854 leaderActor, AppendEntriesReply.class);
856 assertEquals(2, appendEntriesReply.getLogLastIndex());
857 assertEquals(1, appendEntriesReply.getLogLastTerm());
859 leaderActor.underlyingActor().setBehavior(follower);
860 leader.handleMessage(followerActor, appendEntriesReply);
862 leaderActor.underlyingActor().clear();
863 followerActor.underlyingActor().clear();
865 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
866 TimeUnit.MILLISECONDS);
868 leader.handleMessage(leaderActor, new SendHeartBeat());
870 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
872 assertEquals(2, appendEntries.getLeaderCommit());
873 assertEquals(0, appendEntries.getEntries().size());
874 assertEquals(2, appendEntries.getPrevLogIndex());
876 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
878 assertEquals(2, appendEntriesReply.getLogLastIndex());
879 assertEquals(1, appendEntriesReply.getLogLastTerm());
881 assertEquals(2, followerActorContext.getCommitIndex());
887 public void testHandleAppendEntriesReplyFailure(){
888 logStart("testHandleAppendEntriesReplyFailure");
890 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
892 leader = new Leader(leaderActorContext);
894 // Send initial heartbeat reply with last index.
895 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
897 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
898 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
900 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
902 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
904 assertEquals(RaftState.Leader, raftActorBehavior.state());
906 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
910 public void testHandleAppendEntriesReplySuccess() throws Exception {
911 logStart("testHandleAppendEntriesReplySuccess");
913 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
915 leaderActorContext.setReplicatedLog(
916 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
918 leaderActorContext.setCommitIndex(1);
919 leaderActorContext.setLastApplied(1);
920 leaderActorContext.getTermInformation().update(1, "leader");
922 leader = new Leader(leaderActorContext);
924 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
926 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
928 assertEquals(RaftState.Leader, raftActorBehavior.state());
930 assertEquals(2, leaderActorContext.getCommitIndex());
932 ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
933 leaderActor, ApplyLogEntries.class);
935 assertEquals(2, leaderActorContext.getLastApplied());
937 assertEquals(2, applyLogEntries.getToIndex());
939 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
942 assertEquals(1,applyStateList.size());
944 ApplyState applyState = applyStateList.get(0);
946 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
950 public void testHandleAppendEntriesReplyUnknownFollower(){
951 logStart("testHandleAppendEntriesReplyUnknownFollower");
953 MockRaftActorContext leaderActorContext = createActorContext();
955 leader = new Leader(leaderActorContext);
957 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
959 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
961 assertEquals(RaftState.Leader, raftActorBehavior.state());
965 public void testHandleRequestVoteReply(){
966 logStart("testHandleRequestVoteReply");
968 MockRaftActorContext leaderActorContext = createActorContext();
970 leader = new Leader(leaderActorContext);
972 // Should be a no-op.
973 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
974 new RequestVoteReply(1, true));
976 assertEquals(RaftState.Leader, raftActorBehavior.state());
978 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
980 assertEquals(RaftState.Leader, raftActorBehavior.state());
984 public void testIsolatedLeaderCheckNoFollowers() {
985 logStart("testIsolatedLeaderCheckNoFollowers");
987 MockRaftActorContext leaderActorContext = createActorContext();
989 leader = new Leader(leaderActorContext);
990 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
991 Assert.assertTrue(behavior instanceof Leader);
995 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
996 logStart("testIsolatedLeaderCheckTwoFollowers");
998 new JavaTestKit(getSystem()) {{
1000 ActorRef followerActor1 = getTestActor();
1001 ActorRef followerActor2 = getTestActor();
1003 MockRaftActorContext leaderActorContext = createActorContext();
1005 Map<String, String> peerAddresses = new HashMap<>();
1006 peerAddresses.put("follower-1", followerActor1.path().toString());
1007 peerAddresses.put("follower-2", followerActor2.path().toString());
1009 leaderActorContext.setPeerAddresses(peerAddresses);
1011 leader = new Leader(leaderActorContext);
1013 leader.markFollowerActive("follower-1");
1014 leader.markFollowerActive("follower-2");
1015 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1016 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1017 behavior instanceof Leader);
1019 // kill 1 follower and verify if that got killed
1020 final JavaTestKit probe = new JavaTestKit(getSystem());
1021 probe.watch(followerActor1);
1022 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1023 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1024 assertEquals(termMsg1.getActor(), followerActor1);
1026 leader.markFollowerInActive("follower-1");
1027 leader.markFollowerActive("follower-2");
1028 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1029 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1030 behavior instanceof Leader);
1032 // kill 2nd follower and leader should change to Isolated leader
1033 followerActor2.tell(PoisonPill.getInstance(), null);
1034 probe.watch(followerActor2);
1035 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1036 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1037 assertEquals(termMsg2.getActor(), followerActor2);
1039 leader.markFollowerInActive("follower-2");
1040 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1041 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1042 behavior instanceof IsolatedLeader);
1048 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1049 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1051 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1053 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1054 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1055 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1057 leaderActorContext.setConfigParams(configParams);
1059 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1061 followerActorContext.setConfigParams(configParams);
1063 Follower follower = new Follower(followerActorContext);
1064 followerActor.underlyingActor().setBehavior(follower);
1066 leaderActorContext.getReplicatedLog().removeFrom(0);
1067 leaderActorContext.setCommitIndex(-1);
1068 leaderActorContext.setLastApplied(-1);
1070 followerActorContext.getReplicatedLog().removeFrom(0);
1071 followerActorContext.setCommitIndex(-1);
1072 followerActorContext.setLastApplied(-1);
1074 leader = new Leader(leaderActorContext);
1076 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1077 leaderActor, AppendEntriesReply.class);
1079 leader.handleMessage(followerActor, appendEntriesReply);
1081 // Clear initial heartbeat messages
1083 leaderActor.underlyingActor().clear();
1084 followerActor.underlyingActor().clear();
1087 leaderActorContext.setReplicatedLog(
1088 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1089 leaderActorContext.setCommitIndex(1);
1090 leaderActorContext.setLastApplied(1);
1092 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1093 TimeUnit.MILLISECONDS);
1095 leader.handleMessage(leaderActor, new SendHeartBeat());
1097 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1099 // Should send first log entry
1100 assertEquals(1, appendEntries.getLeaderCommit());
1101 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1102 assertEquals(-1, appendEntries.getPrevLogIndex());
1104 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1106 assertEquals(1, appendEntriesReply.getLogLastTerm());
1107 assertEquals(0, appendEntriesReply.getLogLastIndex());
1109 followerActor.underlyingActor().clear();
1111 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1113 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1115 // Should send second log entry
1116 assertEquals(1, appendEntries.getLeaderCommit());
1117 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1123 public void testLaggingFollowerStarvation() throws Exception {
1124 logStart("testLaggingFollowerStarvation");
1125 new JavaTestKit(getSystem()) {{
1126 String leaderActorId = actorFactory.generateActorId("leader");
1127 String follower1ActorId = actorFactory.generateActorId("follower");
1128 String follower2ActorId = actorFactory.generateActorId("follower");
1130 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1131 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1132 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1133 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1135 MockRaftActorContext leaderActorContext =
1136 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1138 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1139 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1140 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1142 leaderActorContext.setConfigParams(configParams);
1144 leaderActorContext.setReplicatedLog(
1145 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1147 Map<String, String> peerAddresses = new HashMap<>();
1148 peerAddresses.put(follower1ActorId,
1149 follower1Actor.path().toString());
1150 peerAddresses.put(follower2ActorId,
1151 follower2Actor.path().toString());
1153 leaderActorContext.setPeerAddresses(peerAddresses);
1154 leaderActorContext.getTermInformation().update(1, leaderActorId);
1156 RaftActorBehavior leader = createBehavior(leaderActorContext);
1158 leaderActor.underlyingActor().setBehavior(leader);
1160 for(int i=1;i<6;i++) {
1161 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1162 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1163 assertTrue(newBehavior == leader);
1164 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1167 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1168 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1170 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1171 heartbeats.size() > 1);
1173 // Check if follower-2 got AppendEntries during this time and was not starved
1174 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1176 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1177 appendEntries.size() > 1);
1183 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1184 ActorRef actorRef, RaftRPC rpc) throws Exception {
1185 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1186 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1189 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1191 private final long electionTimeOutIntervalMillis;
1192 private final int snapshotChunkSize;
1194 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1196 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1197 this.snapshotChunkSize = snapshotChunkSize;
1201 public FiniteDuration getElectionTimeOutInterval() {
1202 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1206 public int getSnapshotChunkSize() {
1207 return snapshotChunkSize;