1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractLeaderTest {
51 static final String FOLLOWER_ID = "follower";
52 public static final String LEADER_ID = "leader";
54 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
57 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
60 private Leader leader;
61 private final short payloadVersion = 5;
65 public void tearDown() throws Exception {
74 public void testHandleMessageForUnknownMessage() throws Exception {
75 logStart("testHandleMessageForUnknownMessage");
77 leader = new Leader(createActorContext());
79 // handle message should return the Leader state when it receives an
81 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
82 Assert.assertTrue(behavior instanceof Leader);
86 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
87 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
89 MockRaftActorContext actorContext = createActorContextWithFollower();
90 short payloadVersion = (short)5;
91 actorContext.setPayloadVersion(payloadVersion);
94 actorContext.getTermInformation().update(term, "");
96 leader = new Leader(actorContext);
98 // Leader should send an immediate heartbeat with no entries as follower is inactive.
99 long lastIndex = actorContext.getReplicatedLog().lastIndex();
100 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
101 assertEquals("getTerm", term, appendEntries.getTerm());
102 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
103 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
104 assertEquals("Entries size", 0, appendEntries.getEntries().size());
105 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
107 // The follower would normally reply - simulate that explicitly here.
108 leader.handleMessage(followerActor, new AppendEntriesReply(
109 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
110 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
112 followerActor.underlyingActor().clear();
114 // Sleep for the heartbeat interval so AppendEntries is sent.
115 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
116 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
118 leader.handleMessage(leaderActor, new SendHeartBeat());
120 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
122 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
123 assertEquals("Entries size", 1, appendEntries.getEntries().size());
124 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
125 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
126 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
130 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
131 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
132 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
134 actorContext.getReplicatedLog().append(newEntry);
135 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
139 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
140 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
142 MockRaftActorContext actorContext = createActorContextWithFollower();
145 actorContext.getTermInformation().update(term, "");
147 leader = new Leader(actorContext);
149 // Leader will send an immediate heartbeat - ignore it.
150 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
152 // The follower would normally reply - simulate that explicitly here.
153 long lastIndex = actorContext.getReplicatedLog().lastIndex();
154 leader.handleMessage(followerActor, new AppendEntriesReply(
155 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
156 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
158 followerActor.underlyingActor().clear();
160 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
162 // State should not change
163 assertTrue(raftBehavior instanceof Leader);
165 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
166 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
167 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
168 assertEquals("Entries size", 1, appendEntries.getEntries().size());
169 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
170 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
171 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
175 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
176 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
178 MockRaftActorContext actorContext = createActorContextWithFollower();
179 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
181 public FiniteDuration getHeartBeatInterval() {
182 return FiniteDuration.apply(5, TimeUnit.SECONDS);
187 actorContext.getTermInformation().update(term, "");
189 leader = new Leader(actorContext);
191 // Leader will send an immediate heartbeat - ignore it.
192 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
194 // The follower would normally reply - simulate that explicitly here.
195 long lastIndex = actorContext.getReplicatedLog().lastIndex();
196 leader.handleMessage(followerActor, new AppendEntriesReply(
197 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
198 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
200 followerActor.underlyingActor().clear();
202 for(int i=0;i<5;i++) {
203 sendReplicate(actorContext, lastIndex+i+1);
206 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
207 // We expect only 1 message to be sent because of two reasons,
208 // - an append entries reply was not received
209 // - the heartbeat interval has not expired
210 // In this scenario if multiple messages are sent they would likely be duplicates
211 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
215 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
216 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
218 MockRaftActorContext actorContext = createActorContextWithFollower();
219 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
221 public FiniteDuration getHeartBeatInterval() {
222 return FiniteDuration.apply(5, TimeUnit.SECONDS);
227 actorContext.getTermInformation().update(term, "");
229 leader = new Leader(actorContext);
231 // Leader will send an immediate heartbeat - ignore it.
232 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 // The follower would normally reply - simulate that explicitly here.
235 long lastIndex = actorContext.getReplicatedLog().lastIndex();
236 leader.handleMessage(followerActor, new AppendEntriesReply(
237 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
238 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
240 followerActor.underlyingActor().clear();
242 for(int i=0;i<3;i++) {
243 sendReplicate(actorContext, lastIndex+i+1);
244 leader.handleMessage(followerActor, new AppendEntriesReply(
245 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
249 for(int i=3;i<5;i++) {
250 sendReplicate(actorContext, lastIndex + i + 1);
253 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
254 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
255 // get sent to the follower - but not the 5th
256 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
258 for(int i=0;i<4;i++) {
259 long expected = allMessages.get(i).getEntries().get(0).getIndex();
260 assertEquals(expected, i+2);
265 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
266 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
268 MockRaftActorContext actorContext = createActorContextWithFollower();
269 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
271 public FiniteDuration getHeartBeatInterval() {
272 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
277 actorContext.getTermInformation().update(term, "");
279 leader = new Leader(actorContext);
281 // Leader will send an immediate heartbeat - ignore it.
282 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284 // The follower would normally reply - simulate that explicitly here.
285 long lastIndex = actorContext.getReplicatedLog().lastIndex();
286 leader.handleMessage(followerActor, new AppendEntriesReply(
287 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290 followerActor.underlyingActor().clear();
292 sendReplicate(actorContext, lastIndex+1);
294 // Wait slightly longer than heartbeat duration
295 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
297 leader.handleMessage(leaderActor, new SendHeartBeat());
299 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
300 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
302 assertEquals(1, allMessages.get(0).getEntries().size());
303 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304 assertEquals(1, allMessages.get(1).getEntries().size());
305 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
310 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
311 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
313 MockRaftActorContext actorContext = createActorContextWithFollower();
314 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
316 public FiniteDuration getHeartBeatInterval() {
317 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
322 actorContext.getTermInformation().update(term, "");
324 leader = new Leader(actorContext);
326 // Leader will send an immediate heartbeat - ignore it.
327 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
329 // The follower would normally reply - simulate that explicitly here.
330 long lastIndex = actorContext.getReplicatedLog().lastIndex();
331 leader.handleMessage(followerActor, new AppendEntriesReply(
332 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
333 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
335 followerActor.underlyingActor().clear();
337 for(int i=0;i<3;i++) {
338 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
339 leader.handleMessage(leaderActor, new SendHeartBeat());
342 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
343 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
347 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
348 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
350 MockRaftActorContext actorContext = createActorContextWithFollower();
351 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
353 public FiniteDuration getHeartBeatInterval() {
354 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
359 actorContext.getTermInformation().update(term, "");
361 leader = new Leader(actorContext);
363 // Leader will send an immediate heartbeat - ignore it.
364 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
366 // The follower would normally reply - simulate that explicitly here.
367 long lastIndex = actorContext.getReplicatedLog().lastIndex();
368 leader.handleMessage(followerActor, new AppendEntriesReply(
369 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
372 followerActor.underlyingActor().clear();
374 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
375 leader.handleMessage(leaderActor, new SendHeartBeat());
376 sendReplicate(actorContext, lastIndex+1);
378 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
379 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
381 assertEquals(0, allMessages.get(0).getEntries().size());
382 assertEquals(1, allMessages.get(1).getEntries().size());
387 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
388 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
390 MockRaftActorContext actorContext = createActorContext();
392 leader = new Leader(actorContext);
394 actorContext.setLastApplied(0);
396 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
397 long term = actorContext.getTermInformation().getCurrentTerm();
398 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
399 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
401 actorContext.getReplicatedLog().append(newEntry);
403 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
404 new Replicate(leaderActor, "state-id", newEntry));
406 // State should not change
407 assertTrue(raftBehavior instanceof Leader);
409 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
411 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
412 // one since lastApplied state is 0.
413 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
414 leaderActor, ApplyState.class);
415 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
417 for(int i = 0; i <= newLogIndex - 1; i++ ) {
418 ApplyState applyState = applyStateList.get(i);
419 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
420 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
423 ApplyState last = applyStateList.get((int) newLogIndex - 1);
424 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
425 assertEquals("getIdentifier", "state-id", last.getIdentifier());
429 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
430 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
432 MockRaftActorContext actorContext = createActorContextWithFollower();
434 Map<String, String> leadersSnapshot = new HashMap<>();
435 leadersSnapshot.put("1", "A");
436 leadersSnapshot.put("2", "B");
437 leadersSnapshot.put("3", "C");
440 actorContext.getReplicatedLog().removeFrom(0);
442 final int followersLastIndex = 2;
443 final int snapshotIndex = 3;
444 final int newEntryIndex = 4;
445 final int snapshotTerm = 1;
446 final int currentTerm = 2;
448 // set the snapshot variables in replicatedlog
449 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
450 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
451 actorContext.setCommitIndex(followersLastIndex);
452 //set follower timeout to 2 mins, helps during debugging
453 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
455 leader = new Leader(actorContext);
458 ReplicatedLogImplEntry entry =
459 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
460 new MockRaftActorContext.MockPayload("D"));
462 //update follower timestamp
463 leader.markFollowerActive(FOLLOWER_ID);
465 ByteString bs = toByteString(leadersSnapshot);
466 leader.setSnapshot(Optional.of(bs));
467 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
468 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
470 //send first chunk and no InstallSnapshotReply received yet
472 fts.incrementChunkIndex();
474 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
475 TimeUnit.MILLISECONDS);
477 leader.handleMessage(leaderActor, new SendHeartBeat());
479 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
483 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
485 //InstallSnapshotReply received
486 fts.markSendStatus(true);
488 leader.handleMessage(leaderActor, new SendHeartBeat());
490 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
492 assertEquals(snapshotIndex, is.getLastIncludedIndex());
496 public void testSendAppendEntriesSnapshotScenario() throws Exception {
497 logStart("testSendAppendEntriesSnapshotScenario");
499 MockRaftActorContext actorContext = createActorContextWithFollower();
501 Map<String, String> leadersSnapshot = new HashMap<>();
502 leadersSnapshot.put("1", "A");
503 leadersSnapshot.put("2", "B");
504 leadersSnapshot.put("3", "C");
507 actorContext.getReplicatedLog().removeFrom(0);
509 final int followersLastIndex = 2;
510 final int snapshotIndex = 3;
511 final int newEntryIndex = 4;
512 final int snapshotTerm = 1;
513 final int currentTerm = 2;
515 // set the snapshot variables in replicatedlog
516 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
517 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
518 actorContext.setCommitIndex(followersLastIndex);
520 leader = new Leader(actorContext);
522 // Leader will send an immediate heartbeat - ignore it.
523 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
526 ReplicatedLogImplEntry entry =
527 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
528 new MockRaftActorContext.MockPayload("D"));
530 actorContext.getReplicatedLog().append(entry);
532 //update follower timestamp
533 leader.markFollowerActive(FOLLOWER_ID);
535 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
536 RaftActorBehavior raftBehavior = leader.handleMessage(
537 leaderActor, new Replicate(null, "state-id", entry));
539 assertTrue(raftBehavior instanceof Leader);
541 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
545 public void testInitiateInstallSnapshot() throws Exception {
546 logStart("testInitiateInstallSnapshot");
548 MockRaftActorContext actorContext = createActorContextWithFollower();
550 Map<String, String> leadersSnapshot = new HashMap<>();
551 leadersSnapshot.put("1", "A");
552 leadersSnapshot.put("2", "B");
553 leadersSnapshot.put("3", "C");
556 actorContext.getReplicatedLog().removeFrom(0);
558 final int followersLastIndex = 2;
559 final int snapshotIndex = 3;
560 final int newEntryIndex = 4;
561 final int snapshotTerm = 1;
562 final int currentTerm = 2;
564 // set the snapshot variables in replicatedlog
565 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567 actorContext.setLastApplied(3);
568 actorContext.setCommitIndex(followersLastIndex);
570 leader = new Leader(actorContext);
572 // Leader will send an immediate heartbeat - ignore it.
573 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
575 // set the snapshot as absent and check if capture-snapshot is invoked.
576 leader.setSnapshot(Optional.<ByteString>absent());
579 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580 new MockRaftActorContext.MockPayload("D"));
582 actorContext.getReplicatedLog().append(entry);
584 //update follower timestamp
585 leader.markFollowerActive(FOLLOWER_ID);
587 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
589 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
591 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
593 assertTrue(cs.isInstallSnapshotInitiated());
594 assertEquals(3, cs.getLastAppliedIndex());
595 assertEquals(1, cs.getLastAppliedTerm());
596 assertEquals(4, cs.getLastIndex());
597 assertEquals(2, cs.getLastTerm());
599 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
600 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
602 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
606 public void testInstallSnapshot() throws Exception {
607 logStart("testInstallSnapshot");
609 MockRaftActorContext actorContext = createActorContextWithFollower();
611 Map<String, String> leadersSnapshot = new HashMap<>();
612 leadersSnapshot.put("1", "A");
613 leadersSnapshot.put("2", "B");
614 leadersSnapshot.put("3", "C");
617 actorContext.getReplicatedLog().removeFrom(0);
619 final int followersLastIndex = 2;
620 final int snapshotIndex = 3;
621 final int snapshotTerm = 1;
622 final int currentTerm = 2;
624 // set the snapshot variables in replicatedlog
625 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
626 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
627 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
628 actorContext.setCommitIndex(followersLastIndex);
630 leader = new Leader(actorContext);
632 // Ignore initial heartbeat.
633 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
635 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
636 new SendInstallSnapshot(toByteString(leadersSnapshot)));
638 assertTrue(raftBehavior instanceof Leader);
640 // check if installsnapshot gets called with the correct values.
642 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
644 assertNotNull(installSnapshot.getData());
645 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
646 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
648 assertEquals(currentTerm, installSnapshot.getTerm());
652 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
653 logStart("testHandleInstallSnapshotReplyLastChunk");
655 MockRaftActorContext actorContext = createActorContextWithFollower();
657 final int followersLastIndex = 2;
658 final int snapshotIndex = 3;
659 final int snapshotTerm = 1;
660 final int currentTerm = 2;
662 actorContext.setCommitIndex(followersLastIndex);
664 leader = new Leader(actorContext);
666 // Ignore initial heartbeat.
667 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
669 Map<String, String> leadersSnapshot = new HashMap<>();
670 leadersSnapshot.put("1", "A");
671 leadersSnapshot.put("2", "B");
672 leadersSnapshot.put("3", "C");
674 // set the snapshot variables in replicatedlog
676 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
677 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
678 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
680 ByteString bs = toByteString(leadersSnapshot);
681 leader.setSnapshot(Optional.of(bs));
682 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
683 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
684 while(!fts.isLastChunk(fts.getChunkIndex())) {
686 fts.incrementChunkIndex();
690 actorContext.getReplicatedLog().removeFrom(0);
692 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
693 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
695 assertTrue(raftBehavior instanceof Leader);
697 assertEquals(0, leader.followerSnapshotSize());
698 assertEquals(1, leader.followerLogSize());
699 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
701 assertEquals(snapshotIndex, fli.getMatchIndex());
702 assertEquals(snapshotIndex, fli.getMatchIndex());
703 assertEquals(snapshotIndex + 1, fli.getNextIndex());
707 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
708 logStart("testSendSnapshotfromInstallSnapshotReply");
710 MockRaftActorContext actorContext = createActorContextWithFollower();
712 final int followersLastIndex = 2;
713 final int snapshotIndex = 3;
714 final int snapshotTerm = 1;
715 final int currentTerm = 2;
717 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
719 public int getSnapshotChunkSize() {
723 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
724 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
726 actorContext.setConfigParams(configParams);
727 actorContext.setCommitIndex(followersLastIndex);
729 leader = new Leader(actorContext);
731 Map<String, String> leadersSnapshot = new HashMap<>();
732 leadersSnapshot.put("1", "A");
733 leadersSnapshot.put("2", "B");
734 leadersSnapshot.put("3", "C");
736 // set the snapshot variables in replicatedlog
737 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
738 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
739 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
741 ByteString bs = toByteString(leadersSnapshot);
742 leader.setSnapshot(Optional.of(bs));
744 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
746 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
748 assertEquals(1, installSnapshot.getChunkIndex());
749 assertEquals(3, installSnapshot.getTotalChunks());
751 followerActor.underlyingActor().clear();
752 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
753 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
755 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
757 assertEquals(2, installSnapshot.getChunkIndex());
758 assertEquals(3, installSnapshot.getTotalChunks());
760 followerActor.underlyingActor().clear();
761 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
762 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
764 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
766 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
767 followerActor.underlyingActor().clear();
768 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
769 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
771 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
773 Assert.assertNull(installSnapshot);
778 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
779 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
781 MockRaftActorContext actorContext = createActorContextWithFollower();
783 final int followersLastIndex = 2;
784 final int snapshotIndex = 3;
785 final int snapshotTerm = 1;
786 final int currentTerm = 2;
788 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
790 public int getSnapshotChunkSize() {
795 actorContext.setCommitIndex(followersLastIndex);
797 leader = new Leader(actorContext);
799 Map<String, String> leadersSnapshot = new HashMap<>();
800 leadersSnapshot.put("1", "A");
801 leadersSnapshot.put("2", "B");
802 leadersSnapshot.put("3", "C");
804 // set the snapshot variables in replicatedlog
805 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
806 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
807 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
809 ByteString bs = toByteString(leadersSnapshot);
810 leader.setSnapshot(Optional.of(bs));
812 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
813 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
815 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
817 assertEquals(1, installSnapshot.getChunkIndex());
818 assertEquals(3, installSnapshot.getTotalChunks());
820 followerActor.underlyingActor().clear();
822 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
823 FOLLOWER_ID, -1, false));
825 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
826 TimeUnit.MILLISECONDS);
828 leader.handleMessage(leaderActor, new SendHeartBeat());
830 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
832 assertEquals(1, installSnapshot.getChunkIndex());
833 assertEquals(3, installSnapshot.getTotalChunks());
837 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
838 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
840 MockRaftActorContext actorContext = createActorContextWithFollower();
842 final int followersLastIndex = 2;
843 final int snapshotIndex = 3;
844 final int snapshotTerm = 1;
845 final int currentTerm = 2;
847 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
849 public int getSnapshotChunkSize() {
854 actorContext.setCommitIndex(followersLastIndex);
856 leader = new Leader(actorContext);
858 Map<String, String> leadersSnapshot = new HashMap<>();
859 leadersSnapshot.put("1", "A");
860 leadersSnapshot.put("2", "B");
861 leadersSnapshot.put("3", "C");
863 // set the snapshot variables in replicatedlog
864 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
865 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
866 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
868 ByteString bs = toByteString(leadersSnapshot);
869 leader.setSnapshot(Optional.of(bs));
871 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
873 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
875 assertEquals(1, installSnapshot.getChunkIndex());
876 assertEquals(3, installSnapshot.getTotalChunks());
877 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
879 int hashCode = installSnapshot.getData().hashCode();
881 followerActor.underlyingActor().clear();
883 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
884 FOLLOWER_ID, 1, true));
886 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
888 assertEquals(2, installSnapshot.getChunkIndex());
889 assertEquals(3, installSnapshot.getTotalChunks());
890 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
894 public void testFollowerToSnapshotLogic() {
895 logStart("testFollowerToSnapshotLogic");
897 MockRaftActorContext actorContext = createActorContext();
899 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
901 public int getSnapshotChunkSize() {
906 leader = new Leader(actorContext);
908 Map<String, String> leadersSnapshot = new HashMap<>();
909 leadersSnapshot.put("1", "A");
910 leadersSnapshot.put("2", "B");
911 leadersSnapshot.put("3", "C");
913 ByteString bs = toByteString(leadersSnapshot);
914 byte[] barray = bs.toByteArray();
916 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
917 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
919 assertEquals(bs.size(), barray.length);
922 for (int i=0; i < barray.length; i = i + 50) {
926 if (i + 50 > barray.length) {
930 ByteString chunk = fts.getNextChunk();
931 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
932 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
934 fts.markSendStatus(true);
935 if (!fts.isLastChunk(chunkIndex)) {
936 fts.incrementChunkIndex();
940 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
943 @Override protected RaftActorBehavior createBehavior(
944 RaftActorContext actorContext) {
945 return new Leader(actorContext);
949 protected MockRaftActorContext createActorContext() {
950 return createActorContext(leaderActor);
954 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
955 return createActorContext(LEADER_ID, actorRef);
958 private MockRaftActorContext createActorContextWithFollower() {
959 MockRaftActorContext actorContext = createActorContext();
960 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
961 followerActor.path().toString()).build());
965 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
966 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
967 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
968 configParams.setElectionTimeoutFactor(100000);
969 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
970 context.setConfigParams(configParams);
971 context.setPayloadVersion(payloadVersion);
975 private MockRaftActorContext createFollowerActorContextWithLeader() {
976 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
977 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
978 followerConfig.setElectionTimeoutFactor(10000);
979 followerActorContext.setConfigParams(followerConfig);
980 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
981 return followerActorContext;
985 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
986 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
988 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
990 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
992 Follower follower = new Follower(followerActorContext);
993 followerActor.underlyingActor().setBehavior(follower);
995 Map<String, String> peerAddresses = new HashMap<>();
996 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
998 leaderActorContext.setPeerAddresses(peerAddresses);
1000 leaderActorContext.getReplicatedLog().removeFrom(0);
1003 leaderActorContext.setReplicatedLog(
1004 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1006 leaderActorContext.setCommitIndex(1);
1008 followerActorContext.getReplicatedLog().removeFrom(0);
1010 // follower too has the exact same log entries and has the same commit index
1011 followerActorContext.setReplicatedLog(
1012 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1014 followerActorContext.setCommitIndex(1);
1016 leader = new Leader(leaderActorContext);
1018 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1020 assertEquals(1, appendEntries.getLeaderCommit());
1021 assertEquals(0, appendEntries.getEntries().size());
1022 assertEquals(0, appendEntries.getPrevLogIndex());
1024 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1025 leaderActor, AppendEntriesReply.class);
1027 assertEquals(2, appendEntriesReply.getLogLastIndex());
1028 assertEquals(1, appendEntriesReply.getLogLastTerm());
1030 // follower returns its next index
1031 assertEquals(2, appendEntriesReply.getLogLastIndex());
1032 assertEquals(1, appendEntriesReply.getLogLastTerm());
1038 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1039 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1041 MockRaftActorContext leaderActorContext = createActorContext();
1043 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1044 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1046 Follower follower = new Follower(followerActorContext);
1047 followerActor.underlyingActor().setBehavior(follower);
1049 Map<String, String> leaderPeerAddresses = new HashMap<>();
1050 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1052 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1054 leaderActorContext.getReplicatedLog().removeFrom(0);
1056 leaderActorContext.setReplicatedLog(
1057 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1059 leaderActorContext.setCommitIndex(1);
1061 followerActorContext.getReplicatedLog().removeFrom(0);
1063 followerActorContext.setReplicatedLog(
1064 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1066 // follower has the same log entries but its commit index > leaders commit index
1067 followerActorContext.setCommitIndex(2);
1069 leader = new Leader(leaderActorContext);
1071 // Initial heartbeat
1072 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1074 assertEquals(1, appendEntries.getLeaderCommit());
1075 assertEquals(0, appendEntries.getEntries().size());
1076 assertEquals(0, appendEntries.getPrevLogIndex());
1078 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1079 leaderActor, AppendEntriesReply.class);
1081 assertEquals(2, appendEntriesReply.getLogLastIndex());
1082 assertEquals(1, appendEntriesReply.getLogLastTerm());
1084 leaderActor.underlyingActor().setBehavior(follower);
1085 leader.handleMessage(followerActor, appendEntriesReply);
1087 leaderActor.underlyingActor().clear();
1088 followerActor.underlyingActor().clear();
1090 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1091 TimeUnit.MILLISECONDS);
1093 leader.handleMessage(leaderActor, new SendHeartBeat());
1095 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1097 assertEquals(2, appendEntries.getLeaderCommit());
1098 assertEquals(0, appendEntries.getEntries().size());
1099 assertEquals(2, appendEntries.getPrevLogIndex());
1101 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1103 assertEquals(2, appendEntriesReply.getLogLastIndex());
1104 assertEquals(1, appendEntriesReply.getLogLastTerm());
1106 assertEquals(2, followerActorContext.getCommitIndex());
1112 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1113 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1115 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1116 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1117 new FiniteDuration(1000, TimeUnit.SECONDS));
1119 leaderActorContext.setReplicatedLog(
1120 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1121 long leaderCommitIndex = 2;
1122 leaderActorContext.setCommitIndex(leaderCommitIndex);
1123 leaderActorContext.setLastApplied(leaderCommitIndex);
1125 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1126 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1128 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1130 followerActorContext.setReplicatedLog(
1131 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1132 followerActorContext.setCommitIndex(0);
1133 followerActorContext.setLastApplied(0);
1135 Follower follower = new Follower(followerActorContext);
1136 followerActor.underlyingActor().setBehavior(follower);
1138 leader = new Leader(leaderActorContext);
1140 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1141 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1143 MessageCollectorActor.clearMessages(followerActor);
1144 MessageCollectorActor.clearMessages(leaderActor);
1146 // Verify initial AppendEntries sent with the leader's current commit index.
1147 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1148 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1149 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1151 leaderActor.underlyingActor().setBehavior(leader);
1153 leader.handleMessage(followerActor, appendEntriesReply);
1155 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1156 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1158 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1159 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1160 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1162 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1163 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1164 appendEntries.getEntries().get(0).getData());
1165 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1166 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1167 appendEntries.getEntries().get(1).getData());
1169 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1170 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1172 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1174 ApplyState applyState = applyStateList.get(0);
1175 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1176 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1177 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1178 applyState.getReplicatedLogEntry().getData());
1180 applyState = applyStateList.get(1);
1181 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1182 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1183 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1184 applyState.getReplicatedLogEntry().getData());
1186 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1187 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1191 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1192 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1194 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1195 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1196 new FiniteDuration(1000, TimeUnit.SECONDS));
1198 leaderActorContext.setReplicatedLog(
1199 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1200 long leaderCommitIndex = 1;
1201 leaderActorContext.setCommitIndex(leaderCommitIndex);
1202 leaderActorContext.setLastApplied(leaderCommitIndex);
1204 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1205 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1207 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1209 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1210 followerActorContext.setCommitIndex(-1);
1211 followerActorContext.setLastApplied(-1);
1213 Follower follower = new Follower(followerActorContext);
1214 followerActor.underlyingActor().setBehavior(follower);
1216 leader = new Leader(leaderActorContext);
1218 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1219 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1221 MessageCollectorActor.clearMessages(followerActor);
1222 MessageCollectorActor.clearMessages(leaderActor);
1224 // Verify initial AppendEntries sent with the leader's current commit index.
1225 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1226 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1227 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1229 leaderActor.underlyingActor().setBehavior(leader);
1231 leader.handleMessage(followerActor, appendEntriesReply);
1233 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1234 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1236 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1237 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1238 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1240 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1241 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1242 appendEntries.getEntries().get(0).getData());
1243 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1244 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1245 appendEntries.getEntries().get(1).getData());
1247 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1248 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1250 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1252 ApplyState applyState = applyStateList.get(0);
1253 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1254 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1255 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1256 applyState.getReplicatedLogEntry().getData());
1258 applyState = applyStateList.get(1);
1259 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1260 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1261 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1262 applyState.getReplicatedLogEntry().getData());
1264 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1265 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1269 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1270 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1272 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1273 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1274 new FiniteDuration(1000, TimeUnit.SECONDS));
1276 leaderActorContext.setReplicatedLog(
1277 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1278 long leaderCommitIndex = 1;
1279 leaderActorContext.setCommitIndex(leaderCommitIndex);
1280 leaderActorContext.setLastApplied(leaderCommitIndex);
1282 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1283 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1285 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1287 followerActorContext.setReplicatedLog(
1288 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1289 followerActorContext.setCommitIndex(-1);
1290 followerActorContext.setLastApplied(-1);
1292 Follower follower = new Follower(followerActorContext);
1293 followerActor.underlyingActor().setBehavior(follower);
1295 leader = new Leader(leaderActorContext);
1297 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1298 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1300 MessageCollectorActor.clearMessages(followerActor);
1301 MessageCollectorActor.clearMessages(leaderActor);
1303 // Verify initial AppendEntries sent with the leader's current commit index.
1304 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1305 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1306 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1308 leaderActor.underlyingActor().setBehavior(leader);
1310 leader.handleMessage(followerActor, appendEntriesReply);
1312 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1313 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1315 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1316 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1317 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1319 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1320 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1321 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1322 appendEntries.getEntries().get(0).getData());
1323 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1324 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1325 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1326 appendEntries.getEntries().get(1).getData());
1328 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1329 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1331 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1333 ApplyState applyState = applyStateList.get(0);
1334 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1335 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1336 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1337 applyState.getReplicatedLogEntry().getData());
1339 applyState = applyStateList.get(1);
1340 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1341 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1342 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1343 applyState.getReplicatedLogEntry().getData());
1345 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1346 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1347 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1351 public void testHandleAppendEntriesReplySuccess() throws Exception {
1352 logStart("testHandleAppendEntriesReplySuccess");
1354 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1356 leaderActorContext.setReplicatedLog(
1357 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1359 leaderActorContext.setCommitIndex(1);
1360 leaderActorContext.setLastApplied(1);
1361 leaderActorContext.getTermInformation().update(1, "leader");
1363 leader = new Leader(leaderActorContext);
1365 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1367 short payloadVersion = 5;
1368 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1370 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1372 assertEquals(RaftState.Leader, raftActorBehavior.state());
1374 assertEquals(2, leaderActorContext.getCommitIndex());
1376 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1377 leaderActor, ApplyJournalEntries.class);
1379 assertEquals(2, leaderActorContext.getLastApplied());
1381 assertEquals(2, applyJournalEntries.getToIndex());
1383 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1386 assertEquals(1,applyStateList.size());
1388 ApplyState applyState = applyStateList.get(0);
1390 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1392 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1393 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1397 public void testHandleAppendEntriesReplyUnknownFollower(){
1398 logStart("testHandleAppendEntriesReplyUnknownFollower");
1400 MockRaftActorContext leaderActorContext = createActorContext();
1402 leader = new Leader(leaderActorContext);
1404 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1406 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1408 assertEquals(RaftState.Leader, raftActorBehavior.state());
1412 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1413 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1415 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1416 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1417 new FiniteDuration(1000, TimeUnit.SECONDS));
1418 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1420 leaderActorContext.setReplicatedLog(
1421 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1422 long leaderCommitIndex = 3;
1423 leaderActorContext.setCommitIndex(leaderCommitIndex);
1424 leaderActorContext.setLastApplied(leaderCommitIndex);
1426 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1427 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1428 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1429 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1431 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1433 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1434 followerActorContext.setCommitIndex(-1);
1435 followerActorContext.setLastApplied(-1);
1437 Follower follower = new Follower(followerActorContext);
1438 followerActor.underlyingActor().setBehavior(follower);
1440 leader = new Leader(leaderActorContext);
1442 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1443 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1445 MessageCollectorActor.clearMessages(followerActor);
1446 MessageCollectorActor.clearMessages(leaderActor);
1448 // Verify initial AppendEntries sent with the leader's current commit index.
1449 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1450 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1451 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1453 leaderActor.underlyingActor().setBehavior(leader);
1455 leader.handleMessage(followerActor, appendEntriesReply);
1457 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1458 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1460 appendEntries = appendEntriesList.get(0);
1461 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1462 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1463 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1465 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1466 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1467 appendEntries.getEntries().get(0).getData());
1468 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1469 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1470 appendEntries.getEntries().get(1).getData());
1472 appendEntries = appendEntriesList.get(1);
1473 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1474 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1475 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1477 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1478 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1479 appendEntries.getEntries().get(0).getData());
1480 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1481 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1482 appendEntries.getEntries().get(1).getData());
1484 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1485 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1487 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1489 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1490 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1494 public void testHandleRequestVoteReply(){
1495 logStart("testHandleRequestVoteReply");
1497 MockRaftActorContext leaderActorContext = createActorContext();
1499 leader = new Leader(leaderActorContext);
1501 // Should be a no-op.
1502 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1503 new RequestVoteReply(1, true));
1505 assertEquals(RaftState.Leader, raftActorBehavior.state());
1507 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1509 assertEquals(RaftState.Leader, raftActorBehavior.state());
1513 public void testIsolatedLeaderCheckNoFollowers() {
1514 logStart("testIsolatedLeaderCheckNoFollowers");
1516 MockRaftActorContext leaderActorContext = createActorContext();
1518 leader = new Leader(leaderActorContext);
1519 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1520 Assert.assertTrue(behavior instanceof Leader);
1524 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1525 logStart("testIsolatedLeaderCheckTwoFollowers");
1527 new JavaTestKit(getSystem()) {{
1529 ActorRef followerActor1 = getTestActor();
1530 ActorRef followerActor2 = getTestActor();
1532 MockRaftActorContext leaderActorContext = createActorContext();
1534 Map<String, String> peerAddresses = new HashMap<>();
1535 peerAddresses.put("follower-1", followerActor1.path().toString());
1536 peerAddresses.put("follower-2", followerActor2.path().toString());
1538 leaderActorContext.setPeerAddresses(peerAddresses);
1540 leader = new Leader(leaderActorContext);
1542 leader.markFollowerActive("follower-1");
1543 leader.markFollowerActive("follower-2");
1544 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1545 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1546 behavior instanceof Leader);
1548 // kill 1 follower and verify if that got killed
1549 final JavaTestKit probe = new JavaTestKit(getSystem());
1550 probe.watch(followerActor1);
1551 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1552 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1553 assertEquals(termMsg1.getActor(), followerActor1);
1555 leader.markFollowerInActive("follower-1");
1556 leader.markFollowerActive("follower-2");
1557 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1558 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1559 behavior instanceof Leader);
1561 // kill 2nd follower and leader should change to Isolated leader
1562 followerActor2.tell(PoisonPill.getInstance(), null);
1563 probe.watch(followerActor2);
1564 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1565 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1566 assertEquals(termMsg2.getActor(), followerActor2);
1568 leader.markFollowerInActive("follower-2");
1569 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1570 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1571 behavior instanceof IsolatedLeader);
1576 public void testLaggingFollowerStarvation() throws Exception {
1577 logStart("testLaggingFollowerStarvation");
1578 new JavaTestKit(getSystem()) {{
1579 String leaderActorId = actorFactory.generateActorId("leader");
1580 String follower1ActorId = actorFactory.generateActorId("follower");
1581 String follower2ActorId = actorFactory.generateActorId("follower");
1583 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1584 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1585 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1586 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1588 MockRaftActorContext leaderActorContext =
1589 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1591 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1592 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1593 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1595 leaderActorContext.setConfigParams(configParams);
1597 leaderActorContext.setReplicatedLog(
1598 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1600 Map<String, String> peerAddresses = new HashMap<>();
1601 peerAddresses.put(follower1ActorId,
1602 follower1Actor.path().toString());
1603 peerAddresses.put(follower2ActorId,
1604 follower2Actor.path().toString());
1606 leaderActorContext.setPeerAddresses(peerAddresses);
1607 leaderActorContext.getTermInformation().update(1, leaderActorId);
1609 RaftActorBehavior leader = createBehavior(leaderActorContext);
1611 leaderActor.underlyingActor().setBehavior(leader);
1613 for(int i=1;i<6;i++) {
1614 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1615 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1616 assertTrue(newBehavior == leader);
1617 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1620 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1621 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1623 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1624 heartbeats.size() > 1);
1626 // Check if follower-2 got AppendEntries during this time and was not starved
1627 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1629 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1630 appendEntries.size() > 1);
1636 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1637 ActorRef actorRef, RaftRPC rpc) throws Exception {
1638 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1639 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1642 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1644 private final long electionTimeOutIntervalMillis;
1645 private final int snapshotChunkSize;
1647 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1649 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1650 this.snapshotChunkSize = snapshotChunkSize;
1654 public FiniteDuration getElectionTimeOutInterval() {
1655 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1659 public int getSnapshotChunkSize() {
1660 return snapshotChunkSize;