1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractLeaderTest {
51 static final String FOLLOWER_ID = "follower";
52 public static final String LEADER_ID = "leader";
54 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
57 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
60 private Leader leader;
64 public void tearDown() throws Exception {
73 public void testHandleMessageForUnknownMessage() throws Exception {
74 logStart("testHandleMessageForUnknownMessage");
76 leader = new Leader(createActorContext());
78 // handle message should return the Leader state when it receives an
80 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
81 Assert.assertTrue(behavior instanceof Leader);
85 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
86 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
88 MockRaftActorContext actorContext = createActorContextWithFollower();
89 short payloadVersion = (short)5;
90 actorContext.setPayloadVersion(payloadVersion);
93 actorContext.getTermInformation().update(term, "");
95 leader = new Leader(actorContext);
97 // Leader should send an immediate heartbeat with no entries as follower is inactive.
98 long lastIndex = actorContext.getReplicatedLog().lastIndex();
99 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
100 assertEquals("getTerm", term, appendEntries.getTerm());
101 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103 assertEquals("Entries size", 0, appendEntries.getEntries().size());
104 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
106 // The follower would normally reply - simulate that explicitly here.
107 leader.handleMessage(followerActor, new AppendEntriesReply(
108 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
109 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
111 followerActor.underlyingActor().clear();
113 // Sleep for the heartbeat interval so AppendEntries is sent.
114 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
115 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
117 leader.handleMessage(leaderActor, new SendHeartBeat());
119 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
121 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
122 assertEquals("Entries size", 1, appendEntries.getEntries().size());
123 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
124 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
129 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
130 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
131 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
133 actorContext.getReplicatedLog().append(newEntry);
134 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
138 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
139 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
141 MockRaftActorContext actorContext = createActorContextWithFollower();
144 actorContext.getTermInformation().update(term, "");
146 leader = new Leader(actorContext);
148 // Leader will send an immediate heartbeat - ignore it.
149 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151 // The follower would normally reply - simulate that explicitly here.
152 long lastIndex = actorContext.getReplicatedLog().lastIndex();
153 leader.handleMessage(followerActor, new AppendEntriesReply(
154 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
155 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
157 followerActor.underlyingActor().clear();
159 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
161 // State should not change
162 assertTrue(raftBehavior instanceof Leader);
164 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
165 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
166 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
167 assertEquals("Entries size", 1, appendEntries.getEntries().size());
168 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
169 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
170 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
174 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
175 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
177 MockRaftActorContext actorContext = createActorContextWithFollower();
178 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
180 public FiniteDuration getHeartBeatInterval() {
181 return FiniteDuration.apply(5, TimeUnit.SECONDS);
186 actorContext.getTermInformation().update(term, "");
188 leader = new Leader(actorContext);
190 // Leader will send an immediate heartbeat - ignore it.
191 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
193 // The follower would normally reply - simulate that explicitly here.
194 long lastIndex = actorContext.getReplicatedLog().lastIndex();
195 leader.handleMessage(followerActor, new AppendEntriesReply(
196 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
197 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
199 followerActor.underlyingActor().clear();
201 for(int i=0;i<5;i++) {
202 sendReplicate(actorContext, lastIndex+i+1);
205 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
206 // We expect only 1 message to be sent because of two reasons,
207 // - an append entries reply was not received
208 // - the heartbeat interval has not expired
209 // In this scenario if multiple messages are sent they would likely be duplicates
210 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
214 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
215 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
217 MockRaftActorContext actorContext = createActorContextWithFollower();
218 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
220 public FiniteDuration getHeartBeatInterval() {
221 return FiniteDuration.apply(5, TimeUnit.SECONDS);
226 actorContext.getTermInformation().update(term, "");
228 leader = new Leader(actorContext);
230 // Leader will send an immediate heartbeat - ignore it.
231 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233 // The follower would normally reply - simulate that explicitly here.
234 long lastIndex = actorContext.getReplicatedLog().lastIndex();
235 leader.handleMessage(followerActor, new AppendEntriesReply(
236 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
237 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
239 followerActor.underlyingActor().clear();
241 for(int i=0;i<3;i++) {
242 sendReplicate(actorContext, lastIndex+i+1);
243 leader.handleMessage(followerActor, new AppendEntriesReply(
244 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
248 for(int i=3;i<5;i++) {
249 sendReplicate(actorContext, lastIndex + i + 1);
252 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
253 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
254 // get sent to the follower - but not the 5th
255 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
257 for(int i=0;i<4;i++) {
258 long expected = allMessages.get(i).getEntries().get(0).getIndex();
259 assertEquals(expected, i+2);
264 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
265 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
267 MockRaftActorContext actorContext = createActorContextWithFollower();
268 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
270 public FiniteDuration getHeartBeatInterval() {
271 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
276 actorContext.getTermInformation().update(term, "");
278 leader = new Leader(actorContext);
280 // Leader will send an immediate heartbeat - ignore it.
281 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 // The follower would normally reply - simulate that explicitly here.
284 long lastIndex = actorContext.getReplicatedLog().lastIndex();
285 leader.handleMessage(followerActor, new AppendEntriesReply(
286 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
287 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289 followerActor.underlyingActor().clear();
291 sendReplicate(actorContext, lastIndex+1);
293 // Wait slightly longer than heartbeat duration
294 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
296 leader.handleMessage(leaderActor, new SendHeartBeat());
298 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
299 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
301 assertEquals(1, allMessages.get(0).getEntries().size());
302 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
303 assertEquals(1, allMessages.get(1).getEntries().size());
304 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
309 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
310 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
312 MockRaftActorContext actorContext = createActorContextWithFollower();
313 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
315 public FiniteDuration getHeartBeatInterval() {
316 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
321 actorContext.getTermInformation().update(term, "");
323 leader = new Leader(actorContext);
325 // Leader will send an immediate heartbeat - ignore it.
326 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
328 // The follower would normally reply - simulate that explicitly here.
329 long lastIndex = actorContext.getReplicatedLog().lastIndex();
330 leader.handleMessage(followerActor, new AppendEntriesReply(
331 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
334 followerActor.underlyingActor().clear();
336 for(int i=0;i<3;i++) {
337 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
338 leader.handleMessage(leaderActor, new SendHeartBeat());
341 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
342 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
346 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
347 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
349 MockRaftActorContext actorContext = createActorContextWithFollower();
350 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352 public FiniteDuration getHeartBeatInterval() {
353 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
358 actorContext.getTermInformation().update(term, "");
360 leader = new Leader(actorContext);
362 // Leader will send an immediate heartbeat - ignore it.
363 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365 // The follower would normally reply - simulate that explicitly here.
366 long lastIndex = actorContext.getReplicatedLog().lastIndex();
367 leader.handleMessage(followerActor, new AppendEntriesReply(
368 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371 followerActor.underlyingActor().clear();
373 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
374 leader.handleMessage(leaderActor, new SendHeartBeat());
375 sendReplicate(actorContext, lastIndex+1);
377 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
378 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
380 assertEquals(0, allMessages.get(0).getEntries().size());
381 assertEquals(1, allMessages.get(1).getEntries().size());
386 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
387 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
389 MockRaftActorContext actorContext = createActorContext();
391 leader = new Leader(actorContext);
393 actorContext.setLastApplied(0);
395 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
396 long term = actorContext.getTermInformation().getCurrentTerm();
397 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
398 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
400 actorContext.getReplicatedLog().append(newEntry);
402 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
403 new Replicate(leaderActor, "state-id", newEntry));
405 // State should not change
406 assertTrue(raftBehavior instanceof Leader);
408 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
410 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
411 // one since lastApplied state is 0.
412 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
413 leaderActor, ApplyState.class);
414 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
416 for(int i = 0; i <= newLogIndex - 1; i++ ) {
417 ApplyState applyState = applyStateList.get(i);
418 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
419 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
422 ApplyState last = applyStateList.get((int) newLogIndex - 1);
423 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
424 assertEquals("getIdentifier", "state-id", last.getIdentifier());
428 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
429 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
431 MockRaftActorContext actorContext = createActorContextWithFollower();
433 Map<String, String> leadersSnapshot = new HashMap<>();
434 leadersSnapshot.put("1", "A");
435 leadersSnapshot.put("2", "B");
436 leadersSnapshot.put("3", "C");
439 actorContext.getReplicatedLog().removeFrom(0);
441 final int followersLastIndex = 2;
442 final int snapshotIndex = 3;
443 final int newEntryIndex = 4;
444 final int snapshotTerm = 1;
445 final int currentTerm = 2;
447 // set the snapshot variables in replicatedlog
448 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
449 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
450 actorContext.setCommitIndex(followersLastIndex);
451 //set follower timeout to 2 mins, helps during debugging
452 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
454 leader = new Leader(actorContext);
457 ReplicatedLogImplEntry entry =
458 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
459 new MockRaftActorContext.MockPayload("D"));
461 //update follower timestamp
462 leader.markFollowerActive(FOLLOWER_ID);
464 ByteString bs = toByteString(leadersSnapshot);
465 leader.setSnapshot(Optional.of(bs));
466 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
467 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
469 //send first chunk and no InstallSnapshotReply received yet
471 fts.incrementChunkIndex();
473 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
474 TimeUnit.MILLISECONDS);
476 leader.handleMessage(leaderActor, new SendHeartBeat());
478 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
482 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
484 //InstallSnapshotReply received
485 fts.markSendStatus(true);
487 leader.handleMessage(leaderActor, new SendHeartBeat());
489 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
491 assertEquals(snapshotIndex, is.getLastIncludedIndex());
495 public void testSendAppendEntriesSnapshotScenario() throws Exception {
496 logStart("testSendAppendEntriesSnapshotScenario");
498 MockRaftActorContext actorContext = createActorContextWithFollower();
500 Map<String, String> leadersSnapshot = new HashMap<>();
501 leadersSnapshot.put("1", "A");
502 leadersSnapshot.put("2", "B");
503 leadersSnapshot.put("3", "C");
506 actorContext.getReplicatedLog().removeFrom(0);
508 final int followersLastIndex = 2;
509 final int snapshotIndex = 3;
510 final int newEntryIndex = 4;
511 final int snapshotTerm = 1;
512 final int currentTerm = 2;
514 // set the snapshot variables in replicatedlog
515 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
516 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
517 actorContext.setCommitIndex(followersLastIndex);
519 leader = new Leader(actorContext);
521 // Leader will send an immediate heartbeat - ignore it.
522 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
525 ReplicatedLogImplEntry entry =
526 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
527 new MockRaftActorContext.MockPayload("D"));
529 actorContext.getReplicatedLog().append(entry);
531 //update follower timestamp
532 leader.markFollowerActive(FOLLOWER_ID);
534 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
535 RaftActorBehavior raftBehavior = leader.handleMessage(
536 leaderActor, new Replicate(null, "state-id", entry));
538 assertTrue(raftBehavior instanceof Leader);
540 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
544 public void testInitiateInstallSnapshot() throws Exception {
545 logStart("testInitiateInstallSnapshot");
547 MockRaftActorContext actorContext = createActorContextWithFollower();
549 Map<String, String> leadersSnapshot = new HashMap<>();
550 leadersSnapshot.put("1", "A");
551 leadersSnapshot.put("2", "B");
552 leadersSnapshot.put("3", "C");
555 actorContext.getReplicatedLog().removeFrom(0);
557 final int followersLastIndex = 2;
558 final int snapshotIndex = 3;
559 final int newEntryIndex = 4;
560 final int snapshotTerm = 1;
561 final int currentTerm = 2;
563 // set the snapshot variables in replicatedlog
564 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
565 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
566 actorContext.setLastApplied(3);
567 actorContext.setCommitIndex(followersLastIndex);
569 leader = new Leader(actorContext);
571 // Leader will send an immediate heartbeat - ignore it.
572 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
574 // set the snapshot as absent and check if capture-snapshot is invoked.
575 leader.setSnapshot(Optional.<ByteString>absent());
578 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579 new MockRaftActorContext.MockPayload("D"));
581 actorContext.getReplicatedLog().append(entry);
583 //update follower timestamp
584 leader.markFollowerActive(FOLLOWER_ID);
586 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
588 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
590 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
592 assertTrue(cs.isInstallSnapshotInitiated());
593 assertEquals(3, cs.getLastAppliedIndex());
594 assertEquals(1, cs.getLastAppliedTerm());
595 assertEquals(4, cs.getLastIndex());
596 assertEquals(2, cs.getLastTerm());
598 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
599 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
601 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
605 public void testInstallSnapshot() throws Exception {
606 logStart("testInstallSnapshot");
608 MockRaftActorContext actorContext = createActorContextWithFollower();
610 Map<String, String> leadersSnapshot = new HashMap<>();
611 leadersSnapshot.put("1", "A");
612 leadersSnapshot.put("2", "B");
613 leadersSnapshot.put("3", "C");
616 actorContext.getReplicatedLog().removeFrom(0);
618 final int followersLastIndex = 2;
619 final int snapshotIndex = 3;
620 final int snapshotTerm = 1;
621 final int currentTerm = 2;
623 // set the snapshot variables in replicatedlog
624 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
625 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
626 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
627 actorContext.setCommitIndex(followersLastIndex);
629 leader = new Leader(actorContext);
631 // Ignore initial heartbeat.
632 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
634 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
635 new SendInstallSnapshot(toByteString(leadersSnapshot)));
637 assertTrue(raftBehavior instanceof Leader);
639 // check if installsnapshot gets called with the correct values.
641 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
643 assertNotNull(installSnapshot.getData());
644 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
645 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
647 assertEquals(currentTerm, installSnapshot.getTerm());
651 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
652 logStart("testHandleInstallSnapshotReplyLastChunk");
654 MockRaftActorContext actorContext = createActorContextWithFollower();
656 final int followersLastIndex = 2;
657 final int snapshotIndex = 3;
658 final int snapshotTerm = 1;
659 final int currentTerm = 2;
661 actorContext.setCommitIndex(followersLastIndex);
663 leader = new Leader(actorContext);
665 // Ignore initial heartbeat.
666 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
668 Map<String, String> leadersSnapshot = new HashMap<>();
669 leadersSnapshot.put("1", "A");
670 leadersSnapshot.put("2", "B");
671 leadersSnapshot.put("3", "C");
673 // set the snapshot variables in replicatedlog
675 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
679 ByteString bs = toByteString(leadersSnapshot);
680 leader.setSnapshot(Optional.of(bs));
681 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
682 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
683 while(!fts.isLastChunk(fts.getChunkIndex())) {
685 fts.incrementChunkIndex();
689 actorContext.getReplicatedLog().removeFrom(0);
691 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
692 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
694 assertTrue(raftBehavior instanceof Leader);
696 assertEquals(0, leader.followerSnapshotSize());
697 assertEquals(1, leader.followerLogSize());
698 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
700 assertEquals(snapshotIndex, fli.getMatchIndex());
701 assertEquals(snapshotIndex, fli.getMatchIndex());
702 assertEquals(snapshotIndex + 1, fli.getNextIndex());
706 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
707 logStart("testSendSnapshotfromInstallSnapshotReply");
709 MockRaftActorContext actorContext = createActorContextWithFollower();
711 final int followersLastIndex = 2;
712 final int snapshotIndex = 3;
713 final int snapshotTerm = 1;
714 final int currentTerm = 2;
716 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
718 public int getSnapshotChunkSize() {
722 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
723 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
725 actorContext.setConfigParams(configParams);
726 actorContext.setCommitIndex(followersLastIndex);
728 leader = new Leader(actorContext);
730 Map<String, String> leadersSnapshot = new HashMap<>();
731 leadersSnapshot.put("1", "A");
732 leadersSnapshot.put("2", "B");
733 leadersSnapshot.put("3", "C");
735 // set the snapshot variables in replicatedlog
736 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740 ByteString bs = toByteString(leadersSnapshot);
741 leader.setSnapshot(Optional.of(bs));
743 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
745 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
747 assertEquals(1, installSnapshot.getChunkIndex());
748 assertEquals(3, installSnapshot.getTotalChunks());
750 followerActor.underlyingActor().clear();
751 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
752 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
754 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
756 assertEquals(2, installSnapshot.getChunkIndex());
757 assertEquals(3, installSnapshot.getTotalChunks());
759 followerActor.underlyingActor().clear();
760 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
761 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
763 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
765 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
766 followerActor.underlyingActor().clear();
767 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
768 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
770 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
772 Assert.assertNull(installSnapshot);
777 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
778 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
780 MockRaftActorContext actorContext = createActorContextWithFollower();
782 final int followersLastIndex = 2;
783 final int snapshotIndex = 3;
784 final int snapshotTerm = 1;
785 final int currentTerm = 2;
787 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
789 public int getSnapshotChunkSize() {
794 actorContext.setCommitIndex(followersLastIndex);
796 leader = new Leader(actorContext);
798 Map<String, String> leadersSnapshot = new HashMap<>();
799 leadersSnapshot.put("1", "A");
800 leadersSnapshot.put("2", "B");
801 leadersSnapshot.put("3", "C");
803 // set the snapshot variables in replicatedlog
804 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
808 ByteString bs = toByteString(leadersSnapshot);
809 leader.setSnapshot(Optional.of(bs));
811 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
812 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
814 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
816 assertEquals(1, installSnapshot.getChunkIndex());
817 assertEquals(3, installSnapshot.getTotalChunks());
819 followerActor.underlyingActor().clear();
821 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
822 FOLLOWER_ID, -1, false));
824 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
825 TimeUnit.MILLISECONDS);
827 leader.handleMessage(leaderActor, new SendHeartBeat());
829 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
831 assertEquals(1, installSnapshot.getChunkIndex());
832 assertEquals(3, installSnapshot.getTotalChunks());
836 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
837 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
839 MockRaftActorContext actorContext = createActorContextWithFollower();
841 final int followersLastIndex = 2;
842 final int snapshotIndex = 3;
843 final int snapshotTerm = 1;
844 final int currentTerm = 2;
846 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
848 public int getSnapshotChunkSize() {
853 actorContext.setCommitIndex(followersLastIndex);
855 leader = new Leader(actorContext);
857 Map<String, String> leadersSnapshot = new HashMap<>();
858 leadersSnapshot.put("1", "A");
859 leadersSnapshot.put("2", "B");
860 leadersSnapshot.put("3", "C");
862 // set the snapshot variables in replicatedlog
863 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
864 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
865 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
867 ByteString bs = toByteString(leadersSnapshot);
868 leader.setSnapshot(Optional.of(bs));
870 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
872 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
874 assertEquals(1, installSnapshot.getChunkIndex());
875 assertEquals(3, installSnapshot.getTotalChunks());
876 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
878 int hashCode = installSnapshot.getData().hashCode();
880 followerActor.underlyingActor().clear();
882 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
883 FOLLOWER_ID, 1, true));
885 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
887 assertEquals(2, installSnapshot.getChunkIndex());
888 assertEquals(3, installSnapshot.getTotalChunks());
889 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
893 public void testFollowerToSnapshotLogic() {
894 logStart("testFollowerToSnapshotLogic");
896 MockRaftActorContext actorContext = createActorContext();
898 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
900 public int getSnapshotChunkSize() {
905 leader = new Leader(actorContext);
907 Map<String, String> leadersSnapshot = new HashMap<>();
908 leadersSnapshot.put("1", "A");
909 leadersSnapshot.put("2", "B");
910 leadersSnapshot.put("3", "C");
912 ByteString bs = toByteString(leadersSnapshot);
913 byte[] barray = bs.toByteArray();
915 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
916 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
918 assertEquals(bs.size(), barray.length);
921 for (int i=0; i < barray.length; i = i + 50) {
925 if (i + 50 > barray.length) {
929 ByteString chunk = fts.getNextChunk();
930 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
931 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
933 fts.markSendStatus(true);
934 if (!fts.isLastChunk(chunkIndex)) {
935 fts.incrementChunkIndex();
939 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
942 @Override protected RaftActorBehavior createBehavior(
943 RaftActorContext actorContext) {
944 return new Leader(actorContext);
948 protected MockRaftActorContext createActorContext() {
949 return createActorContext(leaderActor);
953 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
954 return createActorContext(LEADER_ID, actorRef);
957 private MockRaftActorContext createActorContextWithFollower() {
958 MockRaftActorContext actorContext = createActorContext();
959 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
960 followerActor.path().toString()).build());
964 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
965 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
966 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
967 configParams.setElectionTimeoutFactor(100000);
968 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
969 context.setConfigParams(configParams);
973 private MockRaftActorContext createFollowerActorContextWithLeader() {
974 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
975 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
976 followerConfig.setElectionTimeoutFactor(10000);
977 followerActorContext.setConfigParams(followerConfig);
978 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
979 return followerActorContext;
983 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
984 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
986 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
988 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
990 Follower follower = new Follower(followerActorContext);
991 followerActor.underlyingActor().setBehavior(follower);
993 Map<String, String> peerAddresses = new HashMap<>();
994 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
996 leaderActorContext.setPeerAddresses(peerAddresses);
998 leaderActorContext.getReplicatedLog().removeFrom(0);
1001 leaderActorContext.setReplicatedLog(
1002 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1004 leaderActorContext.setCommitIndex(1);
1006 followerActorContext.getReplicatedLog().removeFrom(0);
1008 // follower too has the exact same log entries and has the same commit index
1009 followerActorContext.setReplicatedLog(
1010 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1012 followerActorContext.setCommitIndex(1);
1014 leader = new Leader(leaderActorContext);
1016 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1018 assertEquals(1, appendEntries.getLeaderCommit());
1019 assertEquals(0, appendEntries.getEntries().size());
1020 assertEquals(0, appendEntries.getPrevLogIndex());
1022 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1023 leaderActor, AppendEntriesReply.class);
1025 assertEquals(2, appendEntriesReply.getLogLastIndex());
1026 assertEquals(1, appendEntriesReply.getLogLastTerm());
1028 // follower returns its next index
1029 assertEquals(2, appendEntriesReply.getLogLastIndex());
1030 assertEquals(1, appendEntriesReply.getLogLastTerm());
1036 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1037 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1039 MockRaftActorContext leaderActorContext = createActorContext();
1041 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1042 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1044 Follower follower = new Follower(followerActorContext);
1045 followerActor.underlyingActor().setBehavior(follower);
1047 Map<String, String> leaderPeerAddresses = new HashMap<>();
1048 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1050 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1052 leaderActorContext.getReplicatedLog().removeFrom(0);
1054 leaderActorContext.setReplicatedLog(
1055 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1057 leaderActorContext.setCommitIndex(1);
1059 followerActorContext.getReplicatedLog().removeFrom(0);
1061 followerActorContext.setReplicatedLog(
1062 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1064 // follower has the same log entries but its commit index > leaders commit index
1065 followerActorContext.setCommitIndex(2);
1067 leader = new Leader(leaderActorContext);
1069 // Initial heartbeat
1070 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1072 assertEquals(1, appendEntries.getLeaderCommit());
1073 assertEquals(0, appendEntries.getEntries().size());
1074 assertEquals(0, appendEntries.getPrevLogIndex());
1076 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1077 leaderActor, AppendEntriesReply.class);
1079 assertEquals(2, appendEntriesReply.getLogLastIndex());
1080 assertEquals(1, appendEntriesReply.getLogLastTerm());
1082 leaderActor.underlyingActor().setBehavior(follower);
1083 leader.handleMessage(followerActor, appendEntriesReply);
1085 leaderActor.underlyingActor().clear();
1086 followerActor.underlyingActor().clear();
1088 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1089 TimeUnit.MILLISECONDS);
1091 leader.handleMessage(leaderActor, new SendHeartBeat());
1093 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1095 assertEquals(2, appendEntries.getLeaderCommit());
1096 assertEquals(0, appendEntries.getEntries().size());
1097 assertEquals(2, appendEntries.getPrevLogIndex());
1099 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1101 assertEquals(2, appendEntriesReply.getLogLastIndex());
1102 assertEquals(1, appendEntriesReply.getLogLastTerm());
1104 assertEquals(2, followerActorContext.getCommitIndex());
1110 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1111 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1113 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1114 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1115 new FiniteDuration(1000, TimeUnit.SECONDS));
1117 leaderActorContext.setReplicatedLog(
1118 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1119 long leaderCommitIndex = 2;
1120 leaderActorContext.setCommitIndex(leaderCommitIndex);
1121 leaderActorContext.setLastApplied(leaderCommitIndex);
1123 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1124 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1126 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1128 followerActorContext.setReplicatedLog(
1129 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1130 followerActorContext.setCommitIndex(0);
1131 followerActorContext.setLastApplied(0);
1133 Follower follower = new Follower(followerActorContext);
1134 followerActor.underlyingActor().setBehavior(follower);
1136 leader = new Leader(leaderActorContext);
1138 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1139 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1141 MessageCollectorActor.clearMessages(followerActor);
1142 MessageCollectorActor.clearMessages(leaderActor);
1144 // Verify initial AppendEntries sent with the leader's current commit index.
1145 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1146 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1147 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1149 leaderActor.underlyingActor().setBehavior(leader);
1151 leader.handleMessage(followerActor, appendEntriesReply);
1153 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1154 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1156 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1157 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1158 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1160 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1161 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1162 appendEntries.getEntries().get(0).getData());
1163 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1164 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1165 appendEntries.getEntries().get(1).getData());
1167 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1168 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1170 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1172 ApplyState applyState = applyStateList.get(0);
1173 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1174 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1175 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1176 applyState.getReplicatedLogEntry().getData());
1178 applyState = applyStateList.get(1);
1179 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1180 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1181 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1182 applyState.getReplicatedLogEntry().getData());
1184 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1185 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1189 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1190 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1192 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1193 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1194 new FiniteDuration(1000, TimeUnit.SECONDS));
1196 leaderActorContext.setReplicatedLog(
1197 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1198 long leaderCommitIndex = 1;
1199 leaderActorContext.setCommitIndex(leaderCommitIndex);
1200 leaderActorContext.setLastApplied(leaderCommitIndex);
1202 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1203 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1205 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1207 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1208 followerActorContext.setCommitIndex(-1);
1209 followerActorContext.setLastApplied(-1);
1211 Follower follower = new Follower(followerActorContext);
1212 followerActor.underlyingActor().setBehavior(follower);
1214 leader = new Leader(leaderActorContext);
1216 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1217 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1219 MessageCollectorActor.clearMessages(followerActor);
1220 MessageCollectorActor.clearMessages(leaderActor);
1222 // Verify initial AppendEntries sent with the leader's current commit index.
1223 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1224 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1225 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1227 leaderActor.underlyingActor().setBehavior(leader);
1229 leader.handleMessage(followerActor, appendEntriesReply);
1231 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1232 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1234 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1235 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1236 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1238 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1239 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1240 appendEntries.getEntries().get(0).getData());
1241 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1242 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1243 appendEntries.getEntries().get(1).getData());
1245 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1246 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1248 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1250 ApplyState applyState = applyStateList.get(0);
1251 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1252 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1253 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1254 applyState.getReplicatedLogEntry().getData());
1256 applyState = applyStateList.get(1);
1257 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1258 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1259 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1260 applyState.getReplicatedLogEntry().getData());
1262 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1263 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1267 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1268 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1270 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1271 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1272 new FiniteDuration(1000, TimeUnit.SECONDS));
1274 leaderActorContext.setReplicatedLog(
1275 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1276 long leaderCommitIndex = 1;
1277 leaderActorContext.setCommitIndex(leaderCommitIndex);
1278 leaderActorContext.setLastApplied(leaderCommitIndex);
1280 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1281 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1283 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1285 followerActorContext.setReplicatedLog(
1286 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1287 followerActorContext.setCommitIndex(-1);
1288 followerActorContext.setLastApplied(-1);
1290 Follower follower = new Follower(followerActorContext);
1291 followerActor.underlyingActor().setBehavior(follower);
1293 leader = new Leader(leaderActorContext);
1295 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1296 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1298 MessageCollectorActor.clearMessages(followerActor);
1299 MessageCollectorActor.clearMessages(leaderActor);
1301 // Verify initial AppendEntries sent with the leader's current commit index.
1302 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1303 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1304 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1306 leaderActor.underlyingActor().setBehavior(leader);
1308 leader.handleMessage(followerActor, appendEntriesReply);
1310 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1311 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1313 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1314 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1315 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1317 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1318 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1319 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1320 appendEntries.getEntries().get(0).getData());
1321 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1322 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1323 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1324 appendEntries.getEntries().get(1).getData());
1326 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1327 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1329 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1331 ApplyState applyState = applyStateList.get(0);
1332 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1333 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1334 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1335 applyState.getReplicatedLogEntry().getData());
1337 applyState = applyStateList.get(1);
1338 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1339 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1340 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1341 applyState.getReplicatedLogEntry().getData());
1343 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1344 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1345 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1349 public void testHandleAppendEntriesReplySuccess() throws Exception {
1350 logStart("testHandleAppendEntriesReplySuccess");
1352 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1354 leaderActorContext.setReplicatedLog(
1355 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1357 leaderActorContext.setCommitIndex(1);
1358 leaderActorContext.setLastApplied(1);
1359 leaderActorContext.getTermInformation().update(1, "leader");
1361 leader = new Leader(leaderActorContext);
1363 short payloadVersion = 5;
1364 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1366 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1368 assertEquals(RaftState.Leader, raftActorBehavior.state());
1370 assertEquals(2, leaderActorContext.getCommitIndex());
1372 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1373 leaderActor, ApplyJournalEntries.class);
1375 assertEquals(2, leaderActorContext.getLastApplied());
1377 assertEquals(2, applyJournalEntries.getToIndex());
1379 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1382 assertEquals(1,applyStateList.size());
1384 ApplyState applyState = applyStateList.get(0);
1386 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1388 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1389 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1393 public void testHandleAppendEntriesReplyUnknownFollower(){
1394 logStart("testHandleAppendEntriesReplyUnknownFollower");
1396 MockRaftActorContext leaderActorContext = createActorContext();
1398 leader = new Leader(leaderActorContext);
1400 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1402 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1404 assertEquals(RaftState.Leader, raftActorBehavior.state());
1408 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1409 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1411 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1412 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1413 new FiniteDuration(1000, TimeUnit.SECONDS));
1414 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1416 leaderActorContext.setReplicatedLog(
1417 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1418 long leaderCommitIndex = 3;
1419 leaderActorContext.setCommitIndex(leaderCommitIndex);
1420 leaderActorContext.setLastApplied(leaderCommitIndex);
1422 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1423 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1424 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1425 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1427 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1429 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1430 followerActorContext.setCommitIndex(-1);
1431 followerActorContext.setLastApplied(-1);
1433 Follower follower = new Follower(followerActorContext);
1434 followerActor.underlyingActor().setBehavior(follower);
1436 leader = new Leader(leaderActorContext);
1438 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1439 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1441 MessageCollectorActor.clearMessages(followerActor);
1442 MessageCollectorActor.clearMessages(leaderActor);
1444 // Verify initial AppendEntries sent with the leader's current commit index.
1445 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1446 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1447 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1449 leaderActor.underlyingActor().setBehavior(leader);
1451 leader.handleMessage(followerActor, appendEntriesReply);
1453 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1454 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1456 appendEntries = appendEntriesList.get(0);
1457 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1458 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1459 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1461 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1462 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1463 appendEntries.getEntries().get(0).getData());
1464 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1465 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1466 appendEntries.getEntries().get(1).getData());
1468 appendEntries = appendEntriesList.get(1);
1469 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1470 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1471 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1473 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1474 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1475 appendEntries.getEntries().get(0).getData());
1476 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1477 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1478 appendEntries.getEntries().get(1).getData());
1480 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1481 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1483 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1485 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1486 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1490 public void testHandleRequestVoteReply(){
1491 logStart("testHandleRequestVoteReply");
1493 MockRaftActorContext leaderActorContext = createActorContext();
1495 leader = new Leader(leaderActorContext);
1497 // Should be a no-op.
1498 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1499 new RequestVoteReply(1, true));
1501 assertEquals(RaftState.Leader, raftActorBehavior.state());
1503 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1505 assertEquals(RaftState.Leader, raftActorBehavior.state());
1509 public void testIsolatedLeaderCheckNoFollowers() {
1510 logStart("testIsolatedLeaderCheckNoFollowers");
1512 MockRaftActorContext leaderActorContext = createActorContext();
1514 leader = new Leader(leaderActorContext);
1515 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1516 Assert.assertTrue(behavior instanceof Leader);
1520 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1521 logStart("testIsolatedLeaderCheckTwoFollowers");
1523 new JavaTestKit(getSystem()) {{
1525 ActorRef followerActor1 = getTestActor();
1526 ActorRef followerActor2 = getTestActor();
1528 MockRaftActorContext leaderActorContext = createActorContext();
1530 Map<String, String> peerAddresses = new HashMap<>();
1531 peerAddresses.put("follower-1", followerActor1.path().toString());
1532 peerAddresses.put("follower-2", followerActor2.path().toString());
1534 leaderActorContext.setPeerAddresses(peerAddresses);
1536 leader = new Leader(leaderActorContext);
1538 leader.markFollowerActive("follower-1");
1539 leader.markFollowerActive("follower-2");
1540 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1541 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1542 behavior instanceof Leader);
1544 // kill 1 follower and verify if that got killed
1545 final JavaTestKit probe = new JavaTestKit(getSystem());
1546 probe.watch(followerActor1);
1547 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1548 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1549 assertEquals(termMsg1.getActor(), followerActor1);
1551 leader.markFollowerInActive("follower-1");
1552 leader.markFollowerActive("follower-2");
1553 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1554 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1555 behavior instanceof Leader);
1557 // kill 2nd follower and leader should change to Isolated leader
1558 followerActor2.tell(PoisonPill.getInstance(), null);
1559 probe.watch(followerActor2);
1560 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1562 assertEquals(termMsg2.getActor(), followerActor2);
1564 leader.markFollowerInActive("follower-2");
1565 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1566 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1567 behavior instanceof IsolatedLeader);
1572 public void testLaggingFollowerStarvation() throws Exception {
1573 logStart("testLaggingFollowerStarvation");
1574 new JavaTestKit(getSystem()) {{
1575 String leaderActorId = actorFactory.generateActorId("leader");
1576 String follower1ActorId = actorFactory.generateActorId("follower");
1577 String follower2ActorId = actorFactory.generateActorId("follower");
1579 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1580 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1581 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1582 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1584 MockRaftActorContext leaderActorContext =
1585 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1587 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1588 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1589 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1591 leaderActorContext.setConfigParams(configParams);
1593 leaderActorContext.setReplicatedLog(
1594 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1596 Map<String, String> peerAddresses = new HashMap<>();
1597 peerAddresses.put(follower1ActorId,
1598 follower1Actor.path().toString());
1599 peerAddresses.put(follower2ActorId,
1600 follower2Actor.path().toString());
1602 leaderActorContext.setPeerAddresses(peerAddresses);
1603 leaderActorContext.getTermInformation().update(1, leaderActorId);
1605 RaftActorBehavior leader = createBehavior(leaderActorContext);
1607 leaderActor.underlyingActor().setBehavior(leader);
1609 for(int i=1;i<6;i++) {
1610 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1611 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1612 assertTrue(newBehavior == leader);
1613 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1616 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1617 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1619 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1620 heartbeats.size() > 1);
1622 // Check if follower-2 got AppendEntries during this time and was not starved
1623 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1625 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1626 appendEntries.size() > 1);
1632 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1633 ActorRef actorRef, RaftRPC rpc) throws Exception {
1634 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1635 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1638 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1640 private final long electionTimeOutIntervalMillis;
1641 private final int snapshotChunkSize;
1643 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1645 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1646 this.snapshotChunkSize = snapshotChunkSize;
1650 public FiniteDuration getElectionTimeOutInterval() {
1651 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1655 public int getSnapshotChunkSize() {
1656 return snapshotChunkSize;