2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.actor.PoisonPill;
20 import akka.actor.Props;
21 import akka.actor.Terminated;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.google.protobuf.ByteString;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
43 import org.opendaylight.controller.cluster.raft.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.Snapshot;
45 import org.opendaylight.controller.cluster.raft.VotingState;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
50 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import scala.concurrent.duration.FiniteDuration;
67 public class LeaderTest extends AbstractLeaderTest {
69 static final String FOLLOWER_ID = "follower";
70 public static final String LEADER_ID = "leader";
72 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
73 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
75 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
78 private Leader leader;
79 private final short payloadVersion = 5;
83 public void tearDown() throws Exception {
92 public void testHandleMessageForUnknownMessage() throws Exception {
93 logStart("testHandleMessageForUnknownMessage");
95 leader = new Leader(createActorContext());
97 // handle message should return the Leader state when it receives an
99 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
100 Assert.assertTrue(behavior instanceof Leader);
104 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107 MockRaftActorContext actorContext = createActorContextWithFollower();
108 short payloadVersion = (short)5;
109 actorContext.setPayloadVersion(payloadVersion);
112 actorContext.getTermInformation().update(term, "");
114 leader = new Leader(actorContext);
116 // Leader should send an immediate heartbeat with no entries as follower is inactive.
117 long lastIndex = actorContext.getReplicatedLog().lastIndex();
118 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119 assertEquals("getTerm", term, appendEntries.getTerm());
120 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122 assertEquals("Entries size", 0, appendEntries.getEntries().size());
123 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125 // The follower would normally reply - simulate that explicitly here.
126 leader.handleMessage(followerActor, new AppendEntriesReply(
127 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
130 followerActor.underlyingActor().clear();
132 // Sleep for the heartbeat interval so AppendEntries is sent.
133 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
136 leader.handleMessage(leaderActor, new SendHeartBeat());
138 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141 assertEquals("Entries size", 1, appendEntries.getEntries().size());
142 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
148 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
150 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
152 actorContext.getReplicatedLog().append(newEntry);
153 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
157 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
158 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
160 MockRaftActorContext actorContext = createActorContextWithFollower();
163 actorContext.getTermInformation().update(term, "");
165 leader = new Leader(actorContext);
167 // Leader will send an immediate heartbeat - ignore it.
168 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
170 // The follower would normally reply - simulate that explicitly here.
171 long lastIndex = actorContext.getReplicatedLog().lastIndex();
172 leader.handleMessage(followerActor, new AppendEntriesReply(
173 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
174 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
176 followerActor.underlyingActor().clear();
178 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
180 // State should not change
181 assertTrue(raftBehavior instanceof Leader);
183 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
184 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
185 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
186 assertEquals("Entries size", 1, appendEntries.getEntries().size());
187 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
188 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
189 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
190 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
194 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
195 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
197 MockRaftActorContext actorContext = createActorContextWithFollower();
198 actorContext.setRaftPolicy(createRaftPolicy(true, true));
201 actorContext.getTermInformation().update(term, "");
203 leader = new Leader(actorContext);
205 // Leader will send an immediate heartbeat - ignore it.
206 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
208 // The follower would normally reply - simulate that explicitly here.
209 long lastIndex = actorContext.getReplicatedLog().lastIndex();
210 leader.handleMessage(followerActor, new AppendEntriesReply(
211 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
212 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
214 followerActor.underlyingActor().clear();
216 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
218 // State should not change
219 assertTrue(raftBehavior instanceof Leader);
221 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
222 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
223 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
224 assertEquals("Entries size", 1, appendEntries.getEntries().size());
225 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
226 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
227 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
228 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
232 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
233 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
235 MockRaftActorContext actorContext = createActorContextWithFollower();
236 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
238 public FiniteDuration getHeartBeatInterval() {
239 return FiniteDuration.apply(5, TimeUnit.SECONDS);
244 actorContext.getTermInformation().update(term, "");
246 leader = new Leader(actorContext);
248 // Leader will send an immediate heartbeat - ignore it.
249 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
251 // The follower would normally reply - simulate that explicitly here.
252 long lastIndex = actorContext.getReplicatedLog().lastIndex();
253 leader.handleMessage(followerActor, new AppendEntriesReply(
254 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
255 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
257 followerActor.underlyingActor().clear();
259 for(int i=0;i<5;i++) {
260 sendReplicate(actorContext, lastIndex+i+1);
263 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
264 // We expect only 1 message to be sent because of two reasons,
265 // - an append entries reply was not received
266 // - the heartbeat interval has not expired
267 // In this scenario if multiple messages are sent they would likely be duplicates
268 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
272 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
273 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
275 MockRaftActorContext actorContext = createActorContextWithFollower();
276 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
278 public FiniteDuration getHeartBeatInterval() {
279 return FiniteDuration.apply(5, TimeUnit.SECONDS);
284 actorContext.getTermInformation().update(term, "");
286 leader = new Leader(actorContext);
288 // Leader will send an immediate heartbeat - ignore it.
289 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
291 // The follower would normally reply - simulate that explicitly here.
292 long lastIndex = actorContext.getReplicatedLog().lastIndex();
293 leader.handleMessage(followerActor, new AppendEntriesReply(
294 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
295 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
297 followerActor.underlyingActor().clear();
299 for(int i=0;i<3;i++) {
300 sendReplicate(actorContext, lastIndex+i+1);
301 leader.handleMessage(followerActor, new AppendEntriesReply(
302 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
306 for(int i=3;i<5;i++) {
307 sendReplicate(actorContext, lastIndex + i + 1);
310 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
311 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
312 // get sent to the follower - but not the 5th
313 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
315 for(int i=0;i<4;i++) {
316 long expected = allMessages.get(i).getEntries().get(0).getIndex();
317 assertEquals(expected, i+2);
322 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
323 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
325 MockRaftActorContext actorContext = createActorContextWithFollower();
326 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
328 public FiniteDuration getHeartBeatInterval() {
329 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
334 actorContext.getTermInformation().update(term, "");
336 leader = new Leader(actorContext);
338 // Leader will send an immediate heartbeat - ignore it.
339 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
341 // The follower would normally reply - simulate that explicitly here.
342 long lastIndex = actorContext.getReplicatedLog().lastIndex();
343 leader.handleMessage(followerActor, new AppendEntriesReply(
344 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
345 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
347 followerActor.underlyingActor().clear();
349 sendReplicate(actorContext, lastIndex+1);
351 // Wait slightly longer than heartbeat duration
352 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
354 leader.handleMessage(leaderActor, new SendHeartBeat());
356 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
357 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
359 assertEquals(1, allMessages.get(0).getEntries().size());
360 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
361 assertEquals(1, allMessages.get(1).getEntries().size());
362 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
367 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
368 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
370 MockRaftActorContext actorContext = createActorContextWithFollower();
371 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
373 public FiniteDuration getHeartBeatInterval() {
374 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
379 actorContext.getTermInformation().update(term, "");
381 leader = new Leader(actorContext);
383 // Leader will send an immediate heartbeat - ignore it.
384 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
386 // The follower would normally reply - simulate that explicitly here.
387 long lastIndex = actorContext.getReplicatedLog().lastIndex();
388 leader.handleMessage(followerActor, new AppendEntriesReply(
389 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
390 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
392 followerActor.underlyingActor().clear();
394 for(int i=0;i<3;i++) {
395 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
396 leader.handleMessage(leaderActor, new SendHeartBeat());
399 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
400 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
404 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
405 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
407 MockRaftActorContext actorContext = createActorContextWithFollower();
408 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
410 public FiniteDuration getHeartBeatInterval() {
411 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
416 actorContext.getTermInformation().update(term, "");
418 leader = new Leader(actorContext);
420 // Leader will send an immediate heartbeat - ignore it.
421 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
423 // The follower would normally reply - simulate that explicitly here.
424 long lastIndex = actorContext.getReplicatedLog().lastIndex();
425 leader.handleMessage(followerActor, new AppendEntriesReply(
426 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
427 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
429 followerActor.underlyingActor().clear();
431 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
432 leader.handleMessage(leaderActor, new SendHeartBeat());
433 sendReplicate(actorContext, lastIndex+1);
435 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
436 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
438 assertEquals(0, allMessages.get(0).getEntries().size());
439 assertEquals(1, allMessages.get(1).getEntries().size());
444 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
445 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
447 MockRaftActorContext actorContext = createActorContext();
449 leader = new Leader(actorContext);
451 actorContext.setLastApplied(0);
453 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
454 long term = actorContext.getTermInformation().getCurrentTerm();
455 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
456 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
458 actorContext.getReplicatedLog().append(newEntry);
460 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
461 new Replicate(leaderActor, "state-id", newEntry));
463 // State should not change
464 assertTrue(raftBehavior instanceof Leader);
466 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
468 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
469 // one since lastApplied state is 0.
470 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
471 leaderActor, ApplyState.class);
472 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
474 for(int i = 0; i <= newLogIndex - 1; i++ ) {
475 ApplyState applyState = applyStateList.get(i);
476 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
477 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
480 ApplyState last = applyStateList.get((int) newLogIndex - 1);
481 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
482 assertEquals("getIdentifier", "state-id", last.getIdentifier());
486 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
487 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
489 MockRaftActorContext actorContext = createActorContextWithFollower();
491 Map<String, String> leadersSnapshot = new HashMap<>();
492 leadersSnapshot.put("1", "A");
493 leadersSnapshot.put("2", "B");
494 leadersSnapshot.put("3", "C");
497 actorContext.getReplicatedLog().removeFrom(0);
499 final int commitIndex = 3;
500 final int snapshotIndex = 2;
501 final int newEntryIndex = 4;
502 final int snapshotTerm = 1;
503 final int currentTerm = 2;
505 // set the snapshot variables in replicatedlog
506 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
507 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
508 actorContext.setCommitIndex(commitIndex);
509 //set follower timeout to 2 mins, helps during debugging
510 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
512 leader = new Leader(actorContext);
514 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
515 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
518 ReplicatedLogImplEntry entry =
519 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
520 new MockRaftActorContext.MockPayload("D"));
522 //update follower timestamp
523 leader.markFollowerActive(FOLLOWER_ID);
525 ByteString bs = toByteString(leadersSnapshot);
526 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
527 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
528 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
529 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
531 //send first chunk and no InstallSnapshotReply received yet
533 fts.incrementChunkIndex();
535 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
536 TimeUnit.MILLISECONDS);
538 leader.handleMessage(leaderActor, new SendHeartBeat());
540 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
542 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
544 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
546 //InstallSnapshotReply received
547 fts.markSendStatus(true);
549 leader.handleMessage(leaderActor, new SendHeartBeat());
551 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
553 assertEquals(commitIndex, is.getLastIncludedIndex());
557 public void testSendAppendEntriesSnapshotScenario() throws Exception {
558 logStart("testSendAppendEntriesSnapshotScenario");
560 MockRaftActorContext actorContext = createActorContextWithFollower();
562 Map<String, String> leadersSnapshot = new HashMap<>();
563 leadersSnapshot.put("1", "A");
564 leadersSnapshot.put("2", "B");
565 leadersSnapshot.put("3", "C");
568 actorContext.getReplicatedLog().removeFrom(0);
570 final int followersLastIndex = 2;
571 final int snapshotIndex = 3;
572 final int newEntryIndex = 4;
573 final int snapshotTerm = 1;
574 final int currentTerm = 2;
576 // set the snapshot variables in replicatedlog
577 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
578 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
579 actorContext.setCommitIndex(followersLastIndex);
581 leader = new Leader(actorContext);
583 // Leader will send an immediate heartbeat - ignore it.
584 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
587 ReplicatedLogImplEntry entry =
588 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
589 new MockRaftActorContext.MockPayload("D"));
591 actorContext.getReplicatedLog().append(entry);
593 //update follower timestamp
594 leader.markFollowerActive(FOLLOWER_ID);
596 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
597 RaftActorBehavior raftBehavior = leader.handleMessage(
598 leaderActor, new Replicate(null, "state-id", entry));
600 assertTrue(raftBehavior instanceof Leader);
602 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
606 public void testInitiateInstallSnapshot() throws Exception {
607 logStart("testInitiateInstallSnapshot");
609 MockRaftActorContext actorContext = createActorContextWithFollower();
612 actorContext.getReplicatedLog().removeFrom(0);
614 final int followersLastIndex = 2;
615 final int snapshotIndex = 3;
616 final int newEntryIndex = 4;
617 final int snapshotTerm = 1;
618 final int currentTerm = 2;
620 // set the snapshot variables in replicatedlog
621 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
622 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
623 actorContext.setLastApplied(3);
624 actorContext.setCommitIndex(followersLastIndex);
626 leader = new Leader(actorContext);
628 // Leader will send an immediate heartbeat - ignore it.
629 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
631 // set the snapshot as absent and check if capture-snapshot is invoked.
632 leader.setSnapshot(null);
635 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
636 new MockRaftActorContext.MockPayload("D"));
638 actorContext.getReplicatedLog().append(entry);
640 //update follower timestamp
641 leader.markFollowerActive(FOLLOWER_ID);
643 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
645 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
647 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
649 assertTrue(cs.isInstallSnapshotInitiated());
650 assertEquals(3, cs.getLastAppliedIndex());
651 assertEquals(1, cs.getLastAppliedTerm());
652 assertEquals(4, cs.getLastIndex());
653 assertEquals(2, cs.getLastTerm());
655 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
656 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
658 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
662 public void testInitiateForceInstallSnapshot() throws Exception {
663 logStart("testInitiateForceInstallSnapshot");
665 MockRaftActorContext actorContext = createActorContextWithFollower();
667 final int followersLastIndex = 2;
668 final int snapshotIndex = -1;
669 final int newEntryIndex = 4;
670 final int snapshotTerm = -1;
671 final int currentTerm = 2;
673 // set the snapshot variables in replicatedlog
674 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
675 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
676 actorContext.setLastApplied(3);
677 actorContext.setCommitIndex(followersLastIndex);
679 actorContext.getReplicatedLog().removeFrom(0);
681 leader = new Leader(actorContext);
683 // Leader will send an immediate heartbeat - ignore it.
684 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
686 // set the snapshot as absent and check if capture-snapshot is invoked.
687 leader.setSnapshot(null);
689 for(int i=0;i<4;i++) {
690 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
691 new MockRaftActorContext.MockPayload("X" + i)));
695 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
696 new MockRaftActorContext.MockPayload("D"));
698 actorContext.getReplicatedLog().append(entry);
700 //update follower timestamp
701 leader.markFollowerActive(FOLLOWER_ID);
703 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
704 // installed with a SendInstallSnapshot
705 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
707 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
709 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
711 assertTrue(cs.isInstallSnapshotInitiated());
712 assertEquals(3, cs.getLastAppliedIndex());
713 assertEquals(1, cs.getLastAppliedTerm());
714 assertEquals(4, cs.getLastIndex());
715 assertEquals(2, cs.getLastTerm());
717 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
718 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
720 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
725 public void testInstallSnapshot() throws Exception {
726 logStart("testInstallSnapshot");
728 MockRaftActorContext actorContext = createActorContextWithFollower();
730 Map<String, String> leadersSnapshot = new HashMap<>();
731 leadersSnapshot.put("1", "A");
732 leadersSnapshot.put("2", "B");
733 leadersSnapshot.put("3", "C");
736 actorContext.getReplicatedLog().removeFrom(0);
738 final int lastAppliedIndex = 3;
739 final int snapshotIndex = 2;
740 final int snapshotTerm = 1;
741 final int currentTerm = 2;
743 // set the snapshot variables in replicatedlog
744 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
745 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
746 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
747 actorContext.setCommitIndex(lastAppliedIndex);
748 actorContext.setLastApplied(lastAppliedIndex);
750 leader = new Leader(actorContext);
752 // Initial heartbeat.
753 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
755 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
756 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
758 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
759 Collections.<ReplicatedLogEntry>emptyList(),
760 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
762 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
764 assertTrue(raftBehavior instanceof Leader);
766 // check if installsnapshot gets called with the correct values.
768 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
770 assertNotNull(installSnapshot.getData());
771 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
772 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
774 assertEquals(currentTerm, installSnapshot.getTerm());
778 public void testForceInstallSnapshot() throws Exception {
779 logStart("testForceInstallSnapshot");
781 MockRaftActorContext actorContext = createActorContextWithFollower();
783 Map<String, String> leadersSnapshot = new HashMap<>();
784 leadersSnapshot.put("1", "A");
785 leadersSnapshot.put("2", "B");
786 leadersSnapshot.put("3", "C");
788 final int lastAppliedIndex = 3;
789 final int snapshotIndex = -1;
790 final int snapshotTerm = -1;
791 final int currentTerm = 2;
793 // set the snapshot variables in replicatedlog
794 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
795 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
796 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
797 actorContext.setCommitIndex(lastAppliedIndex);
798 actorContext.setLastApplied(lastAppliedIndex);
800 leader = new Leader(actorContext);
802 // Initial heartbeat.
803 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
805 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
806 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
808 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
809 Collections.<ReplicatedLogEntry>emptyList(),
810 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
812 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
814 assertTrue(raftBehavior instanceof Leader);
816 // check if installsnapshot gets called with the correct values.
818 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
820 assertNotNull(installSnapshot.getData());
821 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
822 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
824 assertEquals(currentTerm, installSnapshot.getTerm());
828 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
829 logStart("testHandleInstallSnapshotReplyLastChunk");
831 MockRaftActorContext actorContext = createActorContextWithFollower();
833 final int commitIndex = 3;
834 final int snapshotIndex = 2;
835 final int snapshotTerm = 1;
836 final int currentTerm = 2;
838 actorContext.setCommitIndex(commitIndex);
840 leader = new Leader(actorContext);
842 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
843 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
845 // Ignore initial heartbeat.
846 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
848 Map<String, String> leadersSnapshot = new HashMap<>();
849 leadersSnapshot.put("1", "A");
850 leadersSnapshot.put("2", "B");
851 leadersSnapshot.put("3", "C");
853 // set the snapshot variables in replicatedlog
855 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
856 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
857 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
859 ByteString bs = toByteString(leadersSnapshot);
860 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
861 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
862 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
863 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
864 while(!fts.isLastChunk(fts.getChunkIndex())) {
866 fts.incrementChunkIndex();
870 actorContext.getReplicatedLog().removeFrom(0);
872 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
873 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
875 assertTrue(raftBehavior instanceof Leader);
877 assertEquals(0, leader.followerSnapshotSize());
878 assertEquals(1, leader.followerLogSize());
879 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
881 assertEquals(commitIndex, fli.getMatchIndex());
882 assertEquals(commitIndex + 1, fli.getNextIndex());
886 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
887 logStart("testSendSnapshotfromInstallSnapshotReply");
889 MockRaftActorContext actorContext = createActorContextWithFollower();
891 final int commitIndex = 3;
892 final int snapshotIndex = 2;
893 final int snapshotTerm = 1;
894 final int currentTerm = 2;
896 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
898 public int getSnapshotChunkSize() {
902 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
903 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
905 actorContext.setConfigParams(configParams);
906 actorContext.setCommitIndex(commitIndex);
908 leader = new Leader(actorContext);
910 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
911 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
913 Map<String, String> leadersSnapshot = new HashMap<>();
914 leadersSnapshot.put("1", "A");
915 leadersSnapshot.put("2", "B");
916 leadersSnapshot.put("3", "C");
918 // set the snapshot variables in replicatedlog
919 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
920 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
921 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
923 ByteString bs = toByteString(leadersSnapshot);
924 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
925 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
926 leader.setSnapshot(snapshot);
928 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
930 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
932 assertEquals(1, installSnapshot.getChunkIndex());
933 assertEquals(3, installSnapshot.getTotalChunks());
935 followerActor.underlyingActor().clear();
936 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
937 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
939 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
941 assertEquals(2, installSnapshot.getChunkIndex());
942 assertEquals(3, installSnapshot.getTotalChunks());
944 followerActor.underlyingActor().clear();
945 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
946 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
948 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
950 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
951 followerActor.underlyingActor().clear();
952 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
953 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
955 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
957 Assert.assertNull(installSnapshot);
962 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
963 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
965 MockRaftActorContext actorContext = createActorContextWithFollower();
967 final int commitIndex = 3;
968 final int snapshotIndex = 2;
969 final int snapshotTerm = 1;
970 final int currentTerm = 2;
972 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
974 public int getSnapshotChunkSize() {
979 actorContext.setCommitIndex(commitIndex);
981 leader = new Leader(actorContext);
983 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
984 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
986 Map<String, String> leadersSnapshot = new HashMap<>();
987 leadersSnapshot.put("1", "A");
988 leadersSnapshot.put("2", "B");
989 leadersSnapshot.put("3", "C");
991 // set the snapshot variables in replicatedlog
992 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
993 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
994 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
996 ByteString bs = toByteString(leadersSnapshot);
997 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
998 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
999 leader.setSnapshot(snapshot);
1001 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1002 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1004 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1006 assertEquals(1, installSnapshot.getChunkIndex());
1007 assertEquals(3, installSnapshot.getTotalChunks());
1009 followerActor.underlyingActor().clear();
1011 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1012 FOLLOWER_ID, -1, false));
1014 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1015 TimeUnit.MILLISECONDS);
1017 leader.handleMessage(leaderActor, new SendHeartBeat());
1019 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1021 assertEquals(1, installSnapshot.getChunkIndex());
1022 assertEquals(3, installSnapshot.getTotalChunks());
1026 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1027 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1029 MockRaftActorContext actorContext = createActorContextWithFollower();
1031 final int commitIndex = 3;
1032 final int snapshotIndex = 2;
1033 final int snapshotTerm = 1;
1034 final int currentTerm = 2;
1036 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1038 public int getSnapshotChunkSize() {
1043 actorContext.setCommitIndex(commitIndex);
1045 leader = new Leader(actorContext);
1047 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1048 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1050 Map<String, String> leadersSnapshot = new HashMap<>();
1051 leadersSnapshot.put("1", "A");
1052 leadersSnapshot.put("2", "B");
1053 leadersSnapshot.put("3", "C");
1055 // set the snapshot variables in replicatedlog
1056 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1057 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1058 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1060 ByteString bs = toByteString(leadersSnapshot);
1061 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1062 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1063 leader.setSnapshot(snapshot);
1065 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1067 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 assertEquals(1, installSnapshot.getChunkIndex());
1070 assertEquals(3, installSnapshot.getTotalChunks());
1071 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1073 int hashCode = installSnapshot.getData().hashCode();
1075 followerActor.underlyingActor().clear();
1077 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1078 FOLLOWER_ID, 1, true));
1080 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1082 assertEquals(2, installSnapshot.getChunkIndex());
1083 assertEquals(3, installSnapshot.getTotalChunks());
1084 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1088 public void testFollowerToSnapshotLogic() {
1089 logStart("testFollowerToSnapshotLogic");
1091 MockRaftActorContext actorContext = createActorContext();
1093 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1095 public int getSnapshotChunkSize() {
1100 leader = new Leader(actorContext);
1102 Map<String, String> leadersSnapshot = new HashMap<>();
1103 leadersSnapshot.put("1", "A");
1104 leadersSnapshot.put("2", "B");
1105 leadersSnapshot.put("3", "C");
1107 ByteString bs = toByteString(leadersSnapshot);
1108 byte[] barray = bs.toByteArray();
1110 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1111 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1113 assertEquals(bs.size(), barray.length);
1116 for (int i=0; i < barray.length; i = i + 50) {
1120 if (i + 50 > barray.length) {
1124 ByteString chunk = fts.getNextChunk();
1125 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1126 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1128 fts.markSendStatus(true);
1129 if (!fts.isLastChunk(chunkIndex)) {
1130 fts.incrementChunkIndex();
1134 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1137 @Override protected RaftActorBehavior createBehavior(
1138 RaftActorContext actorContext) {
1139 return new Leader(actorContext);
1143 protected MockRaftActorContext createActorContext() {
1144 return createActorContext(leaderActor);
1148 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1149 return createActorContext(LEADER_ID, actorRef);
1152 private MockRaftActorContext createActorContextWithFollower() {
1153 MockRaftActorContext actorContext = createActorContext();
1154 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1155 followerActor.path().toString()).build());
1156 return actorContext;
1159 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1160 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1161 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1162 configParams.setElectionTimeoutFactor(100000);
1163 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1164 context.setConfigParams(configParams);
1165 context.setPayloadVersion(payloadVersion);
1169 private MockRaftActorContext createFollowerActorContextWithLeader() {
1170 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1171 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1172 followerConfig.setElectionTimeoutFactor(10000);
1173 followerActorContext.setConfigParams(followerConfig);
1174 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1175 return followerActorContext;
1179 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1180 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1182 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1184 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1186 Follower follower = new Follower(followerActorContext);
1187 followerActor.underlyingActor().setBehavior(follower);
1189 Map<String, String> peerAddresses = new HashMap<>();
1190 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1192 leaderActorContext.setPeerAddresses(peerAddresses);
1194 leaderActorContext.getReplicatedLog().removeFrom(0);
1197 leaderActorContext.setReplicatedLog(
1198 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1200 leaderActorContext.setCommitIndex(1);
1202 followerActorContext.getReplicatedLog().removeFrom(0);
1204 // follower too has the exact same log entries and has the same commit index
1205 followerActorContext.setReplicatedLog(
1206 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1208 followerActorContext.setCommitIndex(1);
1210 leader = new Leader(leaderActorContext);
1212 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1214 assertEquals(1, appendEntries.getLeaderCommit());
1215 assertEquals(0, appendEntries.getEntries().size());
1216 assertEquals(0, appendEntries.getPrevLogIndex());
1218 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1219 leaderActor, AppendEntriesReply.class);
1221 assertEquals(2, appendEntriesReply.getLogLastIndex());
1222 assertEquals(1, appendEntriesReply.getLogLastTerm());
1224 // follower returns its next index
1225 assertEquals(2, appendEntriesReply.getLogLastIndex());
1226 assertEquals(1, appendEntriesReply.getLogLastTerm());
1232 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1233 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1235 MockRaftActorContext leaderActorContext = createActorContext();
1237 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1238 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1240 Follower follower = new Follower(followerActorContext);
1241 followerActor.underlyingActor().setBehavior(follower);
1243 Map<String, String> leaderPeerAddresses = new HashMap<>();
1244 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1246 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1248 leaderActorContext.getReplicatedLog().removeFrom(0);
1250 leaderActorContext.setReplicatedLog(
1251 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1253 leaderActorContext.setCommitIndex(1);
1255 followerActorContext.getReplicatedLog().removeFrom(0);
1257 followerActorContext.setReplicatedLog(
1258 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1260 // follower has the same log entries but its commit index > leaders commit index
1261 followerActorContext.setCommitIndex(2);
1263 leader = new Leader(leaderActorContext);
1265 // Initial heartbeat
1266 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1268 assertEquals(1, appendEntries.getLeaderCommit());
1269 assertEquals(0, appendEntries.getEntries().size());
1270 assertEquals(0, appendEntries.getPrevLogIndex());
1272 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1273 leaderActor, AppendEntriesReply.class);
1275 assertEquals(2, appendEntriesReply.getLogLastIndex());
1276 assertEquals(1, appendEntriesReply.getLogLastTerm());
1278 leaderActor.underlyingActor().setBehavior(follower);
1279 leader.handleMessage(followerActor, appendEntriesReply);
1281 leaderActor.underlyingActor().clear();
1282 followerActor.underlyingActor().clear();
1284 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1285 TimeUnit.MILLISECONDS);
1287 leader.handleMessage(leaderActor, new SendHeartBeat());
1289 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1291 assertEquals(2, appendEntries.getLeaderCommit());
1292 assertEquals(0, appendEntries.getEntries().size());
1293 assertEquals(2, appendEntries.getPrevLogIndex());
1295 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1297 assertEquals(2, appendEntriesReply.getLogLastIndex());
1298 assertEquals(1, appendEntriesReply.getLogLastTerm());
1300 assertEquals(2, followerActorContext.getCommitIndex());
1306 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1307 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1309 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1310 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1311 new FiniteDuration(1000, TimeUnit.SECONDS));
1313 leaderActorContext.setReplicatedLog(
1314 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315 long leaderCommitIndex = 2;
1316 leaderActorContext.setCommitIndex(leaderCommitIndex);
1317 leaderActorContext.setLastApplied(leaderCommitIndex);
1319 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1320 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1322 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1324 followerActorContext.setReplicatedLog(
1325 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1326 followerActorContext.setCommitIndex(0);
1327 followerActorContext.setLastApplied(0);
1329 Follower follower = new Follower(followerActorContext);
1330 followerActor.underlyingActor().setBehavior(follower);
1332 leader = new Leader(leaderActorContext);
1334 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1335 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1337 MessageCollectorActor.clearMessages(followerActor);
1338 MessageCollectorActor.clearMessages(leaderActor);
1340 // Verify initial AppendEntries sent with the leader's current commit index.
1341 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1342 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1343 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1345 leaderActor.underlyingActor().setBehavior(leader);
1347 leader.handleMessage(followerActor, appendEntriesReply);
1349 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1350 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1352 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1353 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1354 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1356 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1357 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1358 appendEntries.getEntries().get(0).getData());
1359 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1360 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1361 appendEntries.getEntries().get(1).getData());
1363 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1364 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1366 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1368 ApplyState applyState = applyStateList.get(0);
1369 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1370 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1371 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1372 applyState.getReplicatedLogEntry().getData());
1374 applyState = applyStateList.get(1);
1375 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1376 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1377 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1378 applyState.getReplicatedLogEntry().getData());
1380 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1381 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1385 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1386 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1388 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1389 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1390 new FiniteDuration(1000, TimeUnit.SECONDS));
1392 leaderActorContext.setReplicatedLog(
1393 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1394 long leaderCommitIndex = 1;
1395 leaderActorContext.setCommitIndex(leaderCommitIndex);
1396 leaderActorContext.setLastApplied(leaderCommitIndex);
1398 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1399 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1401 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1403 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1404 followerActorContext.setCommitIndex(-1);
1405 followerActorContext.setLastApplied(-1);
1407 Follower follower = new Follower(followerActorContext);
1408 followerActor.underlyingActor().setBehavior(follower);
1410 leader = new Leader(leaderActorContext);
1412 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1413 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1415 MessageCollectorActor.clearMessages(followerActor);
1416 MessageCollectorActor.clearMessages(leaderActor);
1418 // Verify initial AppendEntries sent with the leader's current commit index.
1419 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1420 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1421 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1423 leaderActor.underlyingActor().setBehavior(leader);
1425 leader.handleMessage(followerActor, appendEntriesReply);
1427 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1428 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1430 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1431 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1432 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1434 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1435 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1436 appendEntries.getEntries().get(0).getData());
1437 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1438 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1439 appendEntries.getEntries().get(1).getData());
1441 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1442 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1444 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1446 ApplyState applyState = applyStateList.get(0);
1447 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1448 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1449 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1450 applyState.getReplicatedLogEntry().getData());
1452 applyState = applyStateList.get(1);
1453 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1454 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1455 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1456 applyState.getReplicatedLogEntry().getData());
1458 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1459 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1463 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1464 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1466 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1467 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1468 new FiniteDuration(1000, TimeUnit.SECONDS));
1470 leaderActorContext.setReplicatedLog(
1471 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1472 long leaderCommitIndex = 1;
1473 leaderActorContext.setCommitIndex(leaderCommitIndex);
1474 leaderActorContext.setLastApplied(leaderCommitIndex);
1476 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1477 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1479 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1481 followerActorContext.setReplicatedLog(
1482 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1483 followerActorContext.setCommitIndex(-1);
1484 followerActorContext.setLastApplied(-1);
1486 Follower follower = new Follower(followerActorContext);
1487 followerActor.underlyingActor().setBehavior(follower);
1489 leader = new Leader(leaderActorContext);
1491 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1492 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1494 MessageCollectorActor.clearMessages(followerActor);
1495 MessageCollectorActor.clearMessages(leaderActor);
1497 // Verify initial AppendEntries sent with the leader's current commit index.
1498 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1499 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1500 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1502 leaderActor.underlyingActor().setBehavior(leader);
1504 leader.handleMessage(followerActor, appendEntriesReply);
1506 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1507 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1509 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1510 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1511 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1513 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1514 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1515 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1516 appendEntries.getEntries().get(0).getData());
1517 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1518 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1519 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1520 appendEntries.getEntries().get(1).getData());
1522 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1523 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1525 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1527 ApplyState applyState = applyStateList.get(0);
1528 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1529 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1530 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1531 applyState.getReplicatedLogEntry().getData());
1533 applyState = applyStateList.get(1);
1534 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1535 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1536 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1537 applyState.getReplicatedLogEntry().getData());
1539 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1540 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1541 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1545 public void testHandleAppendEntriesReplyWithNewerTerm(){
1546 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1548 MockRaftActorContext leaderActorContext = createActorContext();
1549 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1550 new FiniteDuration(10000, TimeUnit.SECONDS));
1552 leaderActorContext.setReplicatedLog(
1553 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1555 leader = new Leader(leaderActorContext);
1556 leaderActor.underlyingActor().setBehavior(leader);
1557 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1559 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1561 assertEquals(false, appendEntriesReply.isSuccess());
1562 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1564 MessageCollectorActor.clearMessages(leaderActor);
1568 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1569 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1571 MockRaftActorContext leaderActorContext = createActorContext();
1572 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1573 new FiniteDuration(10000, TimeUnit.SECONDS));
1575 leaderActorContext.setReplicatedLog(
1576 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1577 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1579 leader = new Leader(leaderActorContext);
1580 leaderActor.underlyingActor().setBehavior(leader);
1581 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1583 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1585 assertEquals(false, appendEntriesReply.isSuccess());
1586 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1588 MessageCollectorActor.clearMessages(leaderActor);
1592 public void testHandleAppendEntriesReplySuccess() throws Exception {
1593 logStart("testHandleAppendEntriesReplySuccess");
1595 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1597 leaderActorContext.setReplicatedLog(
1598 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1600 leaderActorContext.setCommitIndex(1);
1601 leaderActorContext.setLastApplied(1);
1602 leaderActorContext.getTermInformation().update(1, "leader");
1604 leader = new Leader(leaderActorContext);
1606 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1608 short payloadVersion = 5;
1609 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1611 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1613 assertEquals(RaftState.Leader, raftActorBehavior.state());
1615 assertEquals(2, leaderActorContext.getCommitIndex());
1617 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1618 leaderActor, ApplyJournalEntries.class);
1620 assertEquals(2, leaderActorContext.getLastApplied());
1622 assertEquals(2, applyJournalEntries.getToIndex());
1624 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1627 assertEquals(1,applyStateList.size());
1629 ApplyState applyState = applyStateList.get(0);
1631 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1633 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1634 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1638 public void testHandleAppendEntriesReplyUnknownFollower(){
1639 logStart("testHandleAppendEntriesReplyUnknownFollower");
1641 MockRaftActorContext leaderActorContext = createActorContext();
1643 leader = new Leader(leaderActorContext);
1645 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1647 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1649 assertEquals(RaftState.Leader, raftActorBehavior.state());
1653 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1654 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1656 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1657 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1658 new FiniteDuration(1000, TimeUnit.SECONDS));
1659 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1661 leaderActorContext.setReplicatedLog(
1662 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1663 long leaderCommitIndex = 3;
1664 leaderActorContext.setCommitIndex(leaderCommitIndex);
1665 leaderActorContext.setLastApplied(leaderCommitIndex);
1667 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1668 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1669 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1670 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1672 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1674 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1675 followerActorContext.setCommitIndex(-1);
1676 followerActorContext.setLastApplied(-1);
1678 Follower follower = new Follower(followerActorContext);
1679 followerActor.underlyingActor().setBehavior(follower);
1681 leader = new Leader(leaderActorContext);
1683 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1684 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1686 MessageCollectorActor.clearMessages(followerActor);
1687 MessageCollectorActor.clearMessages(leaderActor);
1689 // Verify initial AppendEntries sent with the leader's current commit index.
1690 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1691 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1692 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1694 leaderActor.underlyingActor().setBehavior(leader);
1696 leader.handleMessage(followerActor, appendEntriesReply);
1698 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1699 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1701 appendEntries = appendEntriesList.get(0);
1702 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1703 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1704 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1706 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1707 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1708 appendEntries.getEntries().get(0).getData());
1709 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1710 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1711 appendEntries.getEntries().get(1).getData());
1713 appendEntries = appendEntriesList.get(1);
1714 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1715 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1716 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1718 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1719 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1720 appendEntries.getEntries().get(0).getData());
1721 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1722 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1723 appendEntries.getEntries().get(1).getData());
1725 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1726 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1728 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1730 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1731 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1735 public void testHandleRequestVoteReply(){
1736 logStart("testHandleRequestVoteReply");
1738 MockRaftActorContext leaderActorContext = createActorContext();
1740 leader = new Leader(leaderActorContext);
1742 // Should be a no-op.
1743 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1744 new RequestVoteReply(1, true));
1746 assertEquals(RaftState.Leader, raftActorBehavior.state());
1748 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1750 assertEquals(RaftState.Leader, raftActorBehavior.state());
1754 public void testIsolatedLeaderCheckNoFollowers() {
1755 logStart("testIsolatedLeaderCheckNoFollowers");
1757 MockRaftActorContext leaderActorContext = createActorContext();
1759 leader = new Leader(leaderActorContext);
1760 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1761 Assert.assertTrue(behavior instanceof Leader);
1764 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1765 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1766 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1768 MockRaftActorContext leaderActorContext = createActorContext();
1770 Map<String, String> peerAddresses = new HashMap<>();
1771 peerAddresses.put("follower-1", followerActor1.path().toString());
1772 peerAddresses.put("follower-2", followerActor2.path().toString());
1774 leaderActorContext.setPeerAddresses(peerAddresses);
1775 leaderActorContext.setRaftPolicy(raftPolicy);
1777 leader = new Leader(leaderActorContext);
1779 leader.markFollowerActive("follower-1");
1780 leader.markFollowerActive("follower-2");
1781 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1782 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1783 behavior instanceof Leader);
1785 // kill 1 follower and verify if that got killed
1786 final JavaTestKit probe = new JavaTestKit(getSystem());
1787 probe.watch(followerActor1);
1788 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1789 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1790 assertEquals(termMsg1.getActor(), followerActor1);
1792 leader.markFollowerInActive("follower-1");
1793 leader.markFollowerActive("follower-2");
1794 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1795 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1796 behavior instanceof Leader);
1798 // kill 2nd follower and leader should change to Isolated leader
1799 followerActor2.tell(PoisonPill.getInstance(), null);
1800 probe.watch(followerActor2);
1801 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1802 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1803 assertEquals(termMsg2.getActor(), followerActor2);
1805 leader.markFollowerInActive("follower-2");
1806 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1810 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1811 logStart("testIsolatedLeaderCheckTwoFollowers");
1813 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1815 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1816 behavior instanceof IsolatedLeader);
1820 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1821 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1823 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1825 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1826 behavior instanceof Leader);
1830 public void testLaggingFollowerStarvation() throws Exception {
1831 logStart("testLaggingFollowerStarvation");
1832 new JavaTestKit(getSystem()) {{
1833 String leaderActorId = actorFactory.generateActorId("leader");
1834 String follower1ActorId = actorFactory.generateActorId("follower");
1835 String follower2ActorId = actorFactory.generateActorId("follower");
1837 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1838 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1839 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1840 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1842 MockRaftActorContext leaderActorContext =
1843 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1845 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1846 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1847 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1849 leaderActorContext.setConfigParams(configParams);
1851 leaderActorContext.setReplicatedLog(
1852 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1854 Map<String, String> peerAddresses = new HashMap<>();
1855 peerAddresses.put(follower1ActorId,
1856 follower1Actor.path().toString());
1857 peerAddresses.put(follower2ActorId,
1858 follower2Actor.path().toString());
1860 leaderActorContext.setPeerAddresses(peerAddresses);
1861 leaderActorContext.getTermInformation().update(1, leaderActorId);
1863 RaftActorBehavior leader = createBehavior(leaderActorContext);
1865 leaderActor.underlyingActor().setBehavior(leader);
1867 for(int i=1;i<6;i++) {
1868 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1869 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1870 assertTrue(newBehavior == leader);
1871 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1874 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1875 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1877 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1878 heartbeats.size() > 1);
1880 // Check if follower-2 got AppendEntries during this time and was not starved
1881 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1883 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1884 appendEntries.size() > 1);
1890 public void testReplicationConsensusWithNonVotingFollower() {
1891 logStart("testReplicationConsensusWithNonVotingFollower");
1893 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1894 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1895 new FiniteDuration(1000, TimeUnit.SECONDS));
1897 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1899 String nonVotingFollowerId = "nonvoting-follower";
1900 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1901 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1903 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1905 leader = new Leader(leaderActorContext);
1907 // Ignore initial heartbeats
1908 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1909 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1911 MessageCollectorActor.clearMessages(followerActor);
1912 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1913 MessageCollectorActor.clearMessages(leaderActor);
1915 // Send a Replicate message and wait for AppendEntries.
1916 sendReplicate(leaderActorContext, 0);
1918 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1919 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1921 // Send reply only from the voting follower and verify consensus via ApplyState.
1922 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1924 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1926 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1928 MessageCollectorActor.clearMessages(followerActor);
1929 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1930 MessageCollectorActor.clearMessages(leaderActor);
1932 // Send another Replicate message
1933 sendReplicate(leaderActorContext, 1);
1935 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1936 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1937 AppendEntries.class);
1938 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1939 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1941 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1942 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1944 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1946 // Send reply from the voting follower and verify consensus.
1947 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1949 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1953 public void testTransferLeadershipWithFollowerInSync() {
1954 logStart("testTransferLeadershipWithFollowerInSync");
1956 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1957 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1958 new FiniteDuration(1000, TimeUnit.SECONDS));
1959 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1961 leader = new Leader(leaderActorContext);
1963 // Initial heartbeat
1964 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1965 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
1966 MessageCollectorActor.clearMessages(followerActor);
1968 sendReplicate(leaderActorContext, 0);
1969 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1971 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1972 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1973 MessageCollectorActor.clearMessages(followerActor);
1975 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
1976 doNothing().when(mockTransferCohort).transferComplete();
1977 leader.transferLeadership(mockTransferCohort);
1979 verify(mockTransferCohort, never()).transferComplete();
1980 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1983 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
1984 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1986 // Leader should force an election timeout
1987 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
1989 verify(mockTransferCohort).transferComplete();
1993 public void testTransferLeadershipWithEmptyLog() {
1994 logStart("testTransferLeadershipWithEmptyLog");
1996 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1997 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1998 new FiniteDuration(1000, TimeUnit.SECONDS));
1999 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2001 leader = new Leader(leaderActorContext);
2003 // Initial heartbeat
2004 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2005 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2006 MessageCollectorActor.clearMessages(followerActor);
2008 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2009 doNothing().when(mockTransferCohort).transferComplete();
2010 leader.transferLeadership(mockTransferCohort);
2012 verify(mockTransferCohort, never()).transferComplete();
2013 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2014 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2016 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2017 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2019 // Leader should force an election timeout
2020 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2022 verify(mockTransferCohort).transferComplete();
2026 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2027 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2029 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2030 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2031 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2033 leader = new Leader(leaderActorContext);
2035 // Initial heartbeat
2036 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2037 MessageCollectorActor.clearMessages(followerActor);
2039 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2040 doNothing().when(mockTransferCohort).transferComplete();
2041 leader.transferLeadership(mockTransferCohort);
2043 verify(mockTransferCohort, never()).transferComplete();
2045 // Sync up the follower.
2046 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2047 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2048 MessageCollectorActor.clearMessages(followerActor);
2050 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2051 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2052 leader.handleMessage(leaderActor, new SendHeartBeat());
2053 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2054 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2056 // Leader should force an election timeout
2057 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2059 verify(mockTransferCohort).transferComplete();
2063 public void testTransferLeadershipWithFollowerSyncTimeout() {
2064 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2066 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2067 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2068 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2069 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2070 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2072 leader = new Leader(leaderActorContext);
2074 // Initial heartbeat
2075 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2076 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2077 MessageCollectorActor.clearMessages(followerActor);
2079 sendReplicate(leaderActorContext, 0);
2080 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2082 MessageCollectorActor.clearMessages(followerActor);
2084 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2085 doNothing().when(mockTransferCohort).abortTransfer();
2086 leader.transferLeadership(mockTransferCohort);
2088 verify(mockTransferCohort, never()).transferComplete();
2090 // Send heartbeats to time out the transfer.
2091 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2092 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2093 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2094 leader.handleMessage(leaderActor, new SendHeartBeat());
2097 verify(mockTransferCohort).abortTransfer();
2098 verify(mockTransferCohort, never()).transferComplete();
2099 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2103 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2104 ActorRef actorRef, RaftRPC rpc) throws Exception {
2105 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2106 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2109 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2111 private final long electionTimeOutIntervalMillis;
2112 private final int snapshotChunkSize;
2114 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2116 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2117 this.snapshotChunkSize = snapshotChunkSize;
2121 public FiniteDuration getElectionTimeOutInterval() {
2122 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2126 public int getSnapshotChunkSize() {
2127 return snapshotChunkSize;