2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.RaftVersions;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
44 import org.opendaylight.controller.cluster.raft.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import scala.concurrent.duration.FiniteDuration;
67 public class LeaderTest extends AbstractLeaderTest {
69 static final String FOLLOWER_ID = "follower";
70 public static final String LEADER_ID = "leader";
72 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
73 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
75 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
78 private Leader leader;
79 private final short payloadVersion = 5;
83 public void tearDown() throws Exception {
92 public void testHandleMessageForUnknownMessage() throws Exception {
93 logStart("testHandleMessageForUnknownMessage");
95 leader = new Leader(createActorContext());
97 // handle message should return the Leader state when it receives an
99 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
100 Assert.assertTrue(behavior instanceof Leader);
104 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107 MockRaftActorContext actorContext = createActorContextWithFollower();
108 short payloadVersion = (short)5;
109 actorContext.setPayloadVersion(payloadVersion);
112 actorContext.getTermInformation().update(term, "");
114 leader = new Leader(actorContext);
116 // Leader should send an immediate heartbeat with no entries as follower is inactive.
117 long lastIndex = actorContext.getReplicatedLog().lastIndex();
118 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119 assertEquals("getTerm", term, appendEntries.getTerm());
120 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122 assertEquals("Entries size", 0, appendEntries.getEntries().size());
123 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125 // The follower would normally reply - simulate that explicitly here.
126 leader.handleMessage(followerActor, new AppendEntriesReply(
127 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
130 followerActor.underlyingActor().clear();
132 // Sleep for the heartbeat interval so AppendEntries is sent.
133 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
136 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
138 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141 assertEquals("Entries size", 1, appendEntries.getEntries().size());
142 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
148 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149 return sendReplicate(actorContext, 1, index);
152 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
153 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155 term, index, payload);
156 actorContext.getReplicatedLog().append(newEntry);
157 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
161 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
162 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
164 MockRaftActorContext actorContext = createActorContextWithFollower();
167 actorContext.getTermInformation().update(term, "");
169 leader = new Leader(actorContext);
171 // Leader will send an immediate heartbeat - ignore it.
172 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
174 // The follower would normally reply - simulate that explicitly here.
175 long lastIndex = actorContext.getReplicatedLog().lastIndex();
176 leader.handleMessage(followerActor, new AppendEntriesReply(
177 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
178 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
180 followerActor.underlyingActor().clear();
182 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
184 // State should not change
185 assertTrue(raftBehavior instanceof Leader);
187 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
189 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
190 assertEquals("Entries size", 1, appendEntries.getEntries().size());
191 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
192 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
193 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
194 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
198 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
199 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
201 MockRaftActorContext actorContext = createActorContextWithFollower();
203 // The raft context is initialized with a couple log entries. However the commitIndex
204 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
205 // committed and applied. Now it regains leadership with a higher term (2).
206 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
207 long newTerm = prevTerm + 1;
208 actorContext.getTermInformation().update(newTerm, "");
210 leader = new Leader(actorContext);
212 // Leader will send an immediate heartbeat - ignore it.
213 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215 // The follower replies with the leader's current last index and term, simulating that it is
216 // up to date with the leader.
217 long lastIndex = actorContext.getReplicatedLog().lastIndex();
218 leader.handleMessage(followerActor, new AppendEntriesReply(
219 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
221 // The commit index should not get updated even though consensus was reached. This is b/c the
222 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
223 // from previous terms by counting replicas".
224 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
226 followerActor.underlyingActor().clear();
228 // Now replicate a new entry with the new term 2.
229 long newIndex = lastIndex + 1;
230 sendReplicate(actorContext, newTerm, newIndex);
232 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
234 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
235 assertEquals("Entries size", 1, appendEntries.getEntries().size());
236 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
237 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
238 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
240 // The follower replies with success. The leader should now update the commit index to the new index
241 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
242 // prior entries are committed indirectly".
243 leader.handleMessage(followerActor, new AppendEntriesReply(
244 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
246 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
250 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
251 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
253 MockRaftActorContext actorContext = createActorContextWithFollower();
254 actorContext.setRaftPolicy(createRaftPolicy(true, true));
257 actorContext.getTermInformation().update(term, "");
259 leader = new Leader(actorContext);
261 // Leader will send an immediate heartbeat - ignore it.
262 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
264 // The follower would normally reply - simulate that explicitly here.
265 long lastIndex = actorContext.getReplicatedLog().lastIndex();
266 leader.handleMessage(followerActor, new AppendEntriesReply(
267 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
268 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
270 followerActor.underlyingActor().clear();
272 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
274 // State should not change
275 assertTrue(raftBehavior instanceof Leader);
277 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
278 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
279 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
280 assertEquals("Entries size", 1, appendEntries.getEntries().size());
281 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
282 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
283 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
284 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
288 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
289 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
291 MockRaftActorContext actorContext = createActorContextWithFollower();
292 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
294 public FiniteDuration getHeartBeatInterval() {
295 return FiniteDuration.apply(5, TimeUnit.SECONDS);
300 actorContext.getTermInformation().update(term, "");
302 leader = new Leader(actorContext);
304 // Leader will send an immediate heartbeat - ignore it.
305 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
307 // The follower would normally reply - simulate that explicitly here.
308 long lastIndex = actorContext.getReplicatedLog().lastIndex();
309 leader.handleMessage(followerActor, new AppendEntriesReply(
310 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
311 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
313 followerActor.underlyingActor().clear();
315 for(int i=0;i<5;i++) {
316 sendReplicate(actorContext, lastIndex+i+1);
319 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
320 // We expect only 1 message to be sent because of two reasons,
321 // - an append entries reply was not received
322 // - the heartbeat interval has not expired
323 // In this scenario if multiple messages are sent they would likely be duplicates
324 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
328 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
329 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
331 MockRaftActorContext actorContext = createActorContextWithFollower();
332 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
334 public FiniteDuration getHeartBeatInterval() {
335 return FiniteDuration.apply(5, TimeUnit.SECONDS);
340 actorContext.getTermInformation().update(term, "");
342 leader = new Leader(actorContext);
344 // Leader will send an immediate heartbeat - ignore it.
345 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
347 // The follower would normally reply - simulate that explicitly here.
348 long lastIndex = actorContext.getReplicatedLog().lastIndex();
349 leader.handleMessage(followerActor, new AppendEntriesReply(
350 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
351 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
353 followerActor.underlyingActor().clear();
355 for(int i=0;i<3;i++) {
356 sendReplicate(actorContext, lastIndex+i+1);
357 leader.handleMessage(followerActor, new AppendEntriesReply(
358 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
362 for(int i=3;i<5;i++) {
363 sendReplicate(actorContext, lastIndex + i + 1);
366 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
367 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
368 // get sent to the follower - but not the 5th
369 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
371 for(int i=0;i<4;i++) {
372 long expected = allMessages.get(i).getEntries().get(0).getIndex();
373 assertEquals(expected, i+2);
378 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
379 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
381 MockRaftActorContext actorContext = createActorContextWithFollower();
382 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
384 public FiniteDuration getHeartBeatInterval() {
385 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
390 actorContext.getTermInformation().update(term, "");
392 leader = new Leader(actorContext);
394 // Leader will send an immediate heartbeat - ignore it.
395 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
397 // The follower would normally reply - simulate that explicitly here.
398 long lastIndex = actorContext.getReplicatedLog().lastIndex();
399 leader.handleMessage(followerActor, new AppendEntriesReply(
400 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
401 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
403 followerActor.underlyingActor().clear();
405 sendReplicate(actorContext, lastIndex+1);
407 // Wait slightly longer than heartbeat duration
408 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
410 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
412 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
413 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
415 assertEquals(1, allMessages.get(0).getEntries().size());
416 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
417 assertEquals(1, allMessages.get(1).getEntries().size());
418 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
423 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
424 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
426 MockRaftActorContext actorContext = createActorContextWithFollower();
427 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
429 public FiniteDuration getHeartBeatInterval() {
430 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
435 actorContext.getTermInformation().update(term, "");
437 leader = new Leader(actorContext);
439 // Leader will send an immediate heartbeat - ignore it.
440 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
442 // The follower would normally reply - simulate that explicitly here.
443 long lastIndex = actorContext.getReplicatedLog().lastIndex();
444 leader.handleMessage(followerActor, new AppendEntriesReply(
445 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
446 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
448 followerActor.underlyingActor().clear();
450 for(int i=0;i<3;i++) {
451 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
452 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
455 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
456 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
460 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
461 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
463 MockRaftActorContext actorContext = createActorContextWithFollower();
464 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
466 public FiniteDuration getHeartBeatInterval() {
467 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
472 actorContext.getTermInformation().update(term, "");
474 leader = new Leader(actorContext);
476 // Leader will send an immediate heartbeat - ignore it.
477 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
479 // The follower would normally reply - simulate that explicitly here.
480 long lastIndex = actorContext.getReplicatedLog().lastIndex();
481 leader.handleMessage(followerActor, new AppendEntriesReply(
482 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
483 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
485 followerActor.underlyingActor().clear();
487 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
488 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
489 sendReplicate(actorContext, lastIndex+1);
491 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
492 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
494 assertEquals(0, allMessages.get(0).getEntries().size());
495 assertEquals(1, allMessages.get(1).getEntries().size());
500 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
501 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
503 MockRaftActorContext actorContext = createActorContext();
505 leader = new Leader(actorContext);
507 actorContext.setLastApplied(0);
509 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
510 long term = actorContext.getTermInformation().getCurrentTerm();
511 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
512 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
514 actorContext.getReplicatedLog().append(newEntry);
516 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
517 new Replicate(leaderActor, "state-id", newEntry));
519 // State should not change
520 assertTrue(raftBehavior instanceof Leader);
522 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
524 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
525 // one since lastApplied state is 0.
526 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
527 leaderActor, ApplyState.class);
528 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
530 for(int i = 0; i <= newLogIndex - 1; i++ ) {
531 ApplyState applyState = applyStateList.get(i);
532 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
533 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
536 ApplyState last = applyStateList.get((int) newLogIndex - 1);
537 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
538 assertEquals("getIdentifier", "state-id", last.getIdentifier());
542 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
543 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
545 MockRaftActorContext actorContext = createActorContextWithFollower();
547 Map<String, String> leadersSnapshot = new HashMap<>();
548 leadersSnapshot.put("1", "A");
549 leadersSnapshot.put("2", "B");
550 leadersSnapshot.put("3", "C");
553 actorContext.getReplicatedLog().removeFrom(0);
555 final int commitIndex = 3;
556 final int snapshotIndex = 2;
557 final int newEntryIndex = 4;
558 final int snapshotTerm = 1;
559 final int currentTerm = 2;
561 // set the snapshot variables in replicatedlog
562 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
563 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
564 actorContext.setCommitIndex(commitIndex);
565 //set follower timeout to 2 mins, helps during debugging
566 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
568 leader = new Leader(actorContext);
570 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
571 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
574 ReplicatedLogImplEntry entry =
575 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
576 new MockRaftActorContext.MockPayload("D"));
578 //update follower timestamp
579 leader.markFollowerActive(FOLLOWER_ID);
581 ByteString bs = toByteString(leadersSnapshot);
582 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
583 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
584 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
585 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
587 //send first chunk and no InstallSnapshotReply received yet
589 fts.incrementChunkIndex();
591 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
592 TimeUnit.MILLISECONDS);
594 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
596 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
598 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
600 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
602 //InstallSnapshotReply received
603 fts.markSendStatus(true);
605 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
607 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
609 assertEquals(commitIndex, is.getLastIncludedIndex());
613 public void testSendAppendEntriesSnapshotScenario() throws Exception {
614 logStart("testSendAppendEntriesSnapshotScenario");
616 MockRaftActorContext actorContext = createActorContextWithFollower();
618 Map<String, String> leadersSnapshot = new HashMap<>();
619 leadersSnapshot.put("1", "A");
620 leadersSnapshot.put("2", "B");
621 leadersSnapshot.put("3", "C");
624 actorContext.getReplicatedLog().removeFrom(0);
626 final int followersLastIndex = 2;
627 final int snapshotIndex = 3;
628 final int newEntryIndex = 4;
629 final int snapshotTerm = 1;
630 final int currentTerm = 2;
632 // set the snapshot variables in replicatedlog
633 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
634 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
635 actorContext.setCommitIndex(followersLastIndex);
637 leader = new Leader(actorContext);
639 // Leader will send an immediate heartbeat - ignore it.
640 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
643 ReplicatedLogImplEntry entry =
644 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
645 new MockRaftActorContext.MockPayload("D"));
647 actorContext.getReplicatedLog().append(entry);
649 //update follower timestamp
650 leader.markFollowerActive(FOLLOWER_ID);
652 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
653 RaftActorBehavior raftBehavior = leader.handleMessage(
654 leaderActor, new Replicate(null, "state-id", entry));
656 assertTrue(raftBehavior instanceof Leader);
658 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
662 public void testInitiateInstallSnapshot() throws Exception {
663 logStart("testInitiateInstallSnapshot");
665 MockRaftActorContext actorContext = createActorContextWithFollower();
668 actorContext.getReplicatedLog().removeFrom(0);
670 final int followersLastIndex = 2;
671 final int snapshotIndex = 3;
672 final int newEntryIndex = 4;
673 final int snapshotTerm = 1;
674 final int currentTerm = 2;
676 // set the snapshot variables in replicatedlog
677 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
678 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
679 actorContext.setLastApplied(3);
680 actorContext.setCommitIndex(followersLastIndex);
682 leader = new Leader(actorContext);
684 // Leader will send an immediate heartbeat - ignore it.
685 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
687 // set the snapshot as absent and check if capture-snapshot is invoked.
688 leader.setSnapshot(null);
691 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
692 new MockRaftActorContext.MockPayload("D"));
694 actorContext.getReplicatedLog().append(entry);
696 //update follower timestamp
697 leader.markFollowerActive(FOLLOWER_ID);
699 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
701 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
703 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
705 assertTrue(cs.isInstallSnapshotInitiated());
706 assertEquals(3, cs.getLastAppliedIndex());
707 assertEquals(1, cs.getLastAppliedTerm());
708 assertEquals(4, cs.getLastIndex());
709 assertEquals(2, cs.getLastTerm());
711 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
712 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
714 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
718 public void testInitiateForceInstallSnapshot() throws Exception {
719 logStart("testInitiateForceInstallSnapshot");
721 MockRaftActorContext actorContext = createActorContextWithFollower();
723 final int followersLastIndex = 2;
724 final int snapshotIndex = -1;
725 final int newEntryIndex = 4;
726 final int snapshotTerm = -1;
727 final int currentTerm = 2;
729 // set the snapshot variables in replicatedlog
730 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732 actorContext.setLastApplied(3);
733 actorContext.setCommitIndex(followersLastIndex);
735 actorContext.getReplicatedLog().removeFrom(0);
737 leader = new Leader(actorContext);
739 // Leader will send an immediate heartbeat - ignore it.
740 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
742 // set the snapshot as absent and check if capture-snapshot is invoked.
743 leader.setSnapshot(null);
745 for(int i=0;i<4;i++) {
746 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
747 new MockRaftActorContext.MockPayload("X" + i)));
751 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
752 new MockRaftActorContext.MockPayload("D"));
754 actorContext.getReplicatedLog().append(entry);
756 //update follower timestamp
757 leader.markFollowerActive(FOLLOWER_ID);
759 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
760 // installed with a SendInstallSnapshot
761 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
763 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
765 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
767 assertTrue(cs.isInstallSnapshotInitiated());
768 assertEquals(3, cs.getLastAppliedIndex());
769 assertEquals(1, cs.getLastAppliedTerm());
770 assertEquals(4, cs.getLastIndex());
771 assertEquals(2, cs.getLastTerm());
773 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
774 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
776 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
781 public void testInstallSnapshot() throws Exception {
782 logStart("testInstallSnapshot");
784 MockRaftActorContext actorContext = createActorContextWithFollower();
786 Map<String, String> leadersSnapshot = new HashMap<>();
787 leadersSnapshot.put("1", "A");
788 leadersSnapshot.put("2", "B");
789 leadersSnapshot.put("3", "C");
792 actorContext.getReplicatedLog().removeFrom(0);
794 final int lastAppliedIndex = 3;
795 final int snapshotIndex = 2;
796 final int snapshotTerm = 1;
797 final int currentTerm = 2;
799 // set the snapshot variables in replicatedlog
800 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
801 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
802 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
803 actorContext.setCommitIndex(lastAppliedIndex);
804 actorContext.setLastApplied(lastAppliedIndex);
806 leader = new Leader(actorContext);
808 // Initial heartbeat.
809 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
812 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
814 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
815 Collections.<ReplicatedLogEntry>emptyList(),
816 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
818 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
820 assertTrue(raftBehavior instanceof Leader);
822 // check if installsnapshot gets called with the correct values.
824 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826 assertNotNull(installSnapshot.getData());
827 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
828 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
830 assertEquals(currentTerm, installSnapshot.getTerm());
834 public void testForceInstallSnapshot() throws Exception {
835 logStart("testForceInstallSnapshot");
837 MockRaftActorContext actorContext = createActorContextWithFollower();
839 Map<String, String> leadersSnapshot = new HashMap<>();
840 leadersSnapshot.put("1", "A");
841 leadersSnapshot.put("2", "B");
842 leadersSnapshot.put("3", "C");
844 final int lastAppliedIndex = 3;
845 final int snapshotIndex = -1;
846 final int snapshotTerm = -1;
847 final int currentTerm = 2;
849 // set the snapshot variables in replicatedlog
850 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
851 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
852 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
853 actorContext.setCommitIndex(lastAppliedIndex);
854 actorContext.setLastApplied(lastAppliedIndex);
856 leader = new Leader(actorContext);
858 // Initial heartbeat.
859 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
861 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
862 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
864 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
865 Collections.<ReplicatedLogEntry>emptyList(),
866 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
868 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
870 assertTrue(raftBehavior instanceof Leader);
872 // check if installsnapshot gets called with the correct values.
874 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
876 assertNotNull(installSnapshot.getData());
877 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
878 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
880 assertEquals(currentTerm, installSnapshot.getTerm());
884 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
885 logStart("testHandleInstallSnapshotReplyLastChunk");
887 MockRaftActorContext actorContext = createActorContextWithFollower();
889 final int commitIndex = 3;
890 final int snapshotIndex = 2;
891 final int snapshotTerm = 1;
892 final int currentTerm = 2;
894 actorContext.setCommitIndex(commitIndex);
896 leader = new Leader(actorContext);
898 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
899 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
901 // Ignore initial heartbeat.
902 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
904 Map<String, String> leadersSnapshot = new HashMap<>();
905 leadersSnapshot.put("1", "A");
906 leadersSnapshot.put("2", "B");
907 leadersSnapshot.put("3", "C");
909 // set the snapshot variables in replicatedlog
911 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
912 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
913 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
915 ByteString bs = toByteString(leadersSnapshot);
916 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
917 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
918 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
919 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
920 while(!fts.isLastChunk(fts.getChunkIndex())) {
922 fts.incrementChunkIndex();
926 actorContext.getReplicatedLog().removeFrom(0);
928 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
929 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
931 assertTrue(raftBehavior instanceof Leader);
933 assertEquals(0, leader.followerSnapshotSize());
934 assertEquals(1, leader.followerLogSize());
935 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
937 assertEquals(commitIndex, fli.getMatchIndex());
938 assertEquals(commitIndex + 1, fli.getNextIndex());
942 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
943 logStart("testSendSnapshotfromInstallSnapshotReply");
945 MockRaftActorContext actorContext = createActorContextWithFollower();
947 final int commitIndex = 3;
948 final int snapshotIndex = 2;
949 final int snapshotTerm = 1;
950 final int currentTerm = 2;
952 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
954 public int getSnapshotChunkSize() {
958 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
959 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
961 actorContext.setConfigParams(configParams);
962 actorContext.setCommitIndex(commitIndex);
964 leader = new Leader(actorContext);
966 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
967 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
969 Map<String, String> leadersSnapshot = new HashMap<>();
970 leadersSnapshot.put("1", "A");
971 leadersSnapshot.put("2", "B");
972 leadersSnapshot.put("3", "C");
974 // set the snapshot variables in replicatedlog
975 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
976 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
977 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
979 ByteString bs = toByteString(leadersSnapshot);
980 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
981 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
982 leader.setSnapshot(snapshot);
984 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
986 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
988 assertEquals(1, installSnapshot.getChunkIndex());
989 assertEquals(3, installSnapshot.getTotalChunks());
991 followerActor.underlyingActor().clear();
992 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
993 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
995 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
997 assertEquals(2, installSnapshot.getChunkIndex());
998 assertEquals(3, installSnapshot.getTotalChunks());
1000 followerActor.underlyingActor().clear();
1001 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1002 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1004 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1006 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1007 followerActor.underlyingActor().clear();
1008 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1011 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1013 Assert.assertNull(installSnapshot);
1018 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1019 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1021 MockRaftActorContext actorContext = createActorContextWithFollower();
1023 final int commitIndex = 3;
1024 final int snapshotIndex = 2;
1025 final int snapshotTerm = 1;
1026 final int currentTerm = 2;
1028 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1030 public int getSnapshotChunkSize() {
1035 actorContext.setCommitIndex(commitIndex);
1037 leader = new Leader(actorContext);
1039 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1040 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1042 Map<String, String> leadersSnapshot = new HashMap<>();
1043 leadersSnapshot.put("1", "A");
1044 leadersSnapshot.put("2", "B");
1045 leadersSnapshot.put("3", "C");
1047 // set the snapshot variables in replicatedlog
1048 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1049 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1050 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1052 ByteString bs = toByteString(leadersSnapshot);
1053 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1054 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1055 leader.setSnapshot(snapshot);
1057 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1058 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1060 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1062 assertEquals(1, installSnapshot.getChunkIndex());
1063 assertEquals(3, installSnapshot.getTotalChunks());
1065 followerActor.underlyingActor().clear();
1067 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1068 FOLLOWER_ID, -1, false));
1070 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1071 TimeUnit.MILLISECONDS);
1073 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1075 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1077 assertEquals(1, installSnapshot.getChunkIndex());
1078 assertEquals(3, installSnapshot.getTotalChunks());
1082 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1083 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1085 MockRaftActorContext actorContext = createActorContextWithFollower();
1087 final int commitIndex = 3;
1088 final int snapshotIndex = 2;
1089 final int snapshotTerm = 1;
1090 final int currentTerm = 2;
1092 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1094 public int getSnapshotChunkSize() {
1099 actorContext.setCommitIndex(commitIndex);
1101 leader = new Leader(actorContext);
1103 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1104 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1106 Map<String, String> leadersSnapshot = new HashMap<>();
1107 leadersSnapshot.put("1", "A");
1108 leadersSnapshot.put("2", "B");
1109 leadersSnapshot.put("3", "C");
1111 // set the snapshot variables in replicatedlog
1112 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1113 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1114 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1116 ByteString bs = toByteString(leadersSnapshot);
1117 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1118 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1119 leader.setSnapshot(snapshot);
1121 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1123 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1125 assertEquals(1, installSnapshot.getChunkIndex());
1126 assertEquals(3, installSnapshot.getTotalChunks());
1127 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1129 int hashCode = Arrays.hashCode(installSnapshot.getData());
1131 followerActor.underlyingActor().clear();
1133 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1134 FOLLOWER_ID, 1, true));
1136 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1138 assertEquals(2, installSnapshot.getChunkIndex());
1139 assertEquals(3, installSnapshot.getTotalChunks());
1140 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1144 public void testFollowerToSnapshotLogic() {
1145 logStart("testFollowerToSnapshotLogic");
1147 MockRaftActorContext actorContext = createActorContext();
1149 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1151 public int getSnapshotChunkSize() {
1156 leader = new Leader(actorContext);
1158 Map<String, String> leadersSnapshot = new HashMap<>();
1159 leadersSnapshot.put("1", "A");
1160 leadersSnapshot.put("2", "B");
1161 leadersSnapshot.put("3", "C");
1163 ByteString bs = toByteString(leadersSnapshot);
1164 byte[] barray = bs.toByteArray();
1166 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1167 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1169 assertEquals(bs.size(), barray.length);
1172 for (int i=0; i < barray.length; i = i + 50) {
1176 if (i + 50 > barray.length) {
1180 byte[] chunk = fts.getNextChunk();
1181 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1182 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1184 fts.markSendStatus(true);
1185 if (!fts.isLastChunk(chunkIndex)) {
1186 fts.incrementChunkIndex();
1190 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1193 @Override protected RaftActorBehavior createBehavior(
1194 RaftActorContext actorContext) {
1195 return new Leader(actorContext);
1199 protected MockRaftActorContext createActorContext() {
1200 return createActorContext(leaderActor);
1204 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1205 return createActorContext(LEADER_ID, actorRef);
1208 private MockRaftActorContext createActorContextWithFollower() {
1209 MockRaftActorContext actorContext = createActorContext();
1210 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1211 followerActor.path().toString()).build());
1212 return actorContext;
1215 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1216 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1217 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1218 configParams.setElectionTimeoutFactor(100000);
1219 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1220 context.setConfigParams(configParams);
1221 context.setPayloadVersion(payloadVersion);
1225 private MockRaftActorContext createFollowerActorContextWithLeader() {
1226 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1227 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1228 followerConfig.setElectionTimeoutFactor(10000);
1229 followerActorContext.setConfigParams(followerConfig);
1230 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1231 return followerActorContext;
1235 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1236 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1238 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1240 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1242 Follower follower = new Follower(followerActorContext);
1243 followerActor.underlyingActor().setBehavior(follower);
1245 Map<String, String> peerAddresses = new HashMap<>();
1246 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1248 leaderActorContext.setPeerAddresses(peerAddresses);
1250 leaderActorContext.getReplicatedLog().removeFrom(0);
1253 leaderActorContext.setReplicatedLog(
1254 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1256 leaderActorContext.setCommitIndex(1);
1258 followerActorContext.getReplicatedLog().removeFrom(0);
1260 // follower too has the exact same log entries and has the same commit index
1261 followerActorContext.setReplicatedLog(
1262 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264 followerActorContext.setCommitIndex(1);
1266 leader = new Leader(leaderActorContext);
1268 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1270 assertEquals(1, appendEntries.getLeaderCommit());
1271 assertEquals(0, appendEntries.getEntries().size());
1272 assertEquals(0, appendEntries.getPrevLogIndex());
1274 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1275 leaderActor, AppendEntriesReply.class);
1277 assertEquals(2, appendEntriesReply.getLogLastIndex());
1278 assertEquals(1, appendEntriesReply.getLogLastTerm());
1280 // follower returns its next index
1281 assertEquals(2, appendEntriesReply.getLogLastIndex());
1282 assertEquals(1, appendEntriesReply.getLogLastTerm());
1288 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1289 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1291 MockRaftActorContext leaderActorContext = createActorContext();
1293 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1294 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296 Follower follower = new Follower(followerActorContext);
1297 followerActor.underlyingActor().setBehavior(follower);
1299 Map<String, String> leaderPeerAddresses = new HashMap<>();
1300 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1302 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1304 leaderActorContext.getReplicatedLog().removeFrom(0);
1306 leaderActorContext.setReplicatedLog(
1307 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309 leaderActorContext.setCommitIndex(1);
1311 followerActorContext.getReplicatedLog().removeFrom(0);
1313 followerActorContext.setReplicatedLog(
1314 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316 // follower has the same log entries but its commit index > leaders commit index
1317 followerActorContext.setCommitIndex(2);
1319 leader = new Leader(leaderActorContext);
1321 // Initial heartbeat
1322 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1324 assertEquals(1, appendEntries.getLeaderCommit());
1325 assertEquals(0, appendEntries.getEntries().size());
1326 assertEquals(0, appendEntries.getPrevLogIndex());
1328 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1329 leaderActor, AppendEntriesReply.class);
1331 assertEquals(2, appendEntriesReply.getLogLastIndex());
1332 assertEquals(1, appendEntriesReply.getLogLastTerm());
1334 leaderActor.underlyingActor().setBehavior(follower);
1335 leader.handleMessage(followerActor, appendEntriesReply);
1337 leaderActor.underlyingActor().clear();
1338 followerActor.underlyingActor().clear();
1340 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1341 TimeUnit.MILLISECONDS);
1343 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1345 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1347 assertEquals(2, appendEntries.getLeaderCommit());
1348 assertEquals(0, appendEntries.getEntries().size());
1349 assertEquals(2, appendEntries.getPrevLogIndex());
1351 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1353 assertEquals(2, appendEntriesReply.getLogLastIndex());
1354 assertEquals(1, appendEntriesReply.getLogLastTerm());
1356 assertEquals(2, followerActorContext.getCommitIndex());
1362 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1363 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1365 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1366 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1367 new FiniteDuration(1000, TimeUnit.SECONDS));
1369 leaderActorContext.setReplicatedLog(
1370 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1371 long leaderCommitIndex = 2;
1372 leaderActorContext.setCommitIndex(leaderCommitIndex);
1373 leaderActorContext.setLastApplied(leaderCommitIndex);
1375 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1376 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1378 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1380 followerActorContext.setReplicatedLog(
1381 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1382 followerActorContext.setCommitIndex(0);
1383 followerActorContext.setLastApplied(0);
1385 Follower follower = new Follower(followerActorContext);
1386 followerActor.underlyingActor().setBehavior(follower);
1388 leader = new Leader(leaderActorContext);
1390 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1393 MessageCollectorActor.clearMessages(followerActor);
1394 MessageCollectorActor.clearMessages(leaderActor);
1396 // Verify initial AppendEntries sent with the leader's current commit index.
1397 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1398 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1399 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1401 leaderActor.underlyingActor().setBehavior(leader);
1403 leader.handleMessage(followerActor, appendEntriesReply);
1405 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1406 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1408 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1409 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1410 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1412 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1413 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1414 appendEntries.getEntries().get(0).getData());
1415 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1416 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1417 appendEntries.getEntries().get(1).getData());
1419 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1420 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1422 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1424 ApplyState applyState = applyStateList.get(0);
1425 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1426 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1427 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1428 applyState.getReplicatedLogEntry().getData());
1430 applyState = applyStateList.get(1);
1431 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1432 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1433 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1434 applyState.getReplicatedLogEntry().getData());
1436 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1437 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1441 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1442 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1444 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1445 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1446 new FiniteDuration(1000, TimeUnit.SECONDS));
1448 leaderActorContext.setReplicatedLog(
1449 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1450 long leaderCommitIndex = 1;
1451 leaderActorContext.setCommitIndex(leaderCommitIndex);
1452 leaderActorContext.setLastApplied(leaderCommitIndex);
1454 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1455 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1457 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1459 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1460 followerActorContext.setCommitIndex(-1);
1461 followerActorContext.setLastApplied(-1);
1463 Follower follower = new Follower(followerActorContext);
1464 followerActor.underlyingActor().setBehavior(follower);
1466 leader = new Leader(leaderActorContext);
1468 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1469 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1471 MessageCollectorActor.clearMessages(followerActor);
1472 MessageCollectorActor.clearMessages(leaderActor);
1474 // Verify initial AppendEntries sent with the leader's current commit index.
1475 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1476 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1477 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479 leaderActor.underlyingActor().setBehavior(leader);
1481 leader.handleMessage(followerActor, appendEntriesReply);
1483 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1484 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1486 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1487 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1488 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1490 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1491 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1492 appendEntries.getEntries().get(0).getData());
1493 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1494 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1495 appendEntries.getEntries().get(1).getData());
1497 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1498 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1500 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1502 ApplyState applyState = applyStateList.get(0);
1503 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1504 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1505 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1506 applyState.getReplicatedLogEntry().getData());
1508 applyState = applyStateList.get(1);
1509 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1510 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1512 applyState.getReplicatedLogEntry().getData());
1514 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1515 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1519 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1520 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1522 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1523 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1524 new FiniteDuration(1000, TimeUnit.SECONDS));
1526 leaderActorContext.setReplicatedLog(
1527 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1528 long leaderCommitIndex = 1;
1529 leaderActorContext.setCommitIndex(leaderCommitIndex);
1530 leaderActorContext.setLastApplied(leaderCommitIndex);
1532 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1533 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1535 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1537 followerActorContext.setReplicatedLog(
1538 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1539 followerActorContext.setCommitIndex(-1);
1540 followerActorContext.setLastApplied(-1);
1542 Follower follower = new Follower(followerActorContext);
1543 followerActor.underlyingActor().setBehavior(follower);
1545 leader = new Leader(leaderActorContext);
1547 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1548 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1550 MessageCollectorActor.clearMessages(followerActor);
1551 MessageCollectorActor.clearMessages(leaderActor);
1553 // Verify initial AppendEntries sent with the leader's current commit index.
1554 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1555 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1556 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1558 leaderActor.underlyingActor().setBehavior(leader);
1560 leader.handleMessage(followerActor, appendEntriesReply);
1562 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1563 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1565 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1566 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1567 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1569 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1570 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1571 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1572 appendEntries.getEntries().get(0).getData());
1573 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1574 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1575 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1576 appendEntries.getEntries().get(1).getData());
1578 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1579 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1581 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1583 ApplyState applyState = applyStateList.get(0);
1584 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1585 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1586 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1587 applyState.getReplicatedLogEntry().getData());
1589 applyState = applyStateList.get(1);
1590 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1591 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1592 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1593 applyState.getReplicatedLogEntry().getData());
1595 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1596 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1597 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1601 public void testHandleAppendEntriesReplyWithNewerTerm(){
1602 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1604 MockRaftActorContext leaderActorContext = createActorContext();
1605 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1606 new FiniteDuration(10000, TimeUnit.SECONDS));
1608 leaderActorContext.setReplicatedLog(
1609 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1611 leader = new Leader(leaderActorContext);
1612 leaderActor.underlyingActor().setBehavior(leader);
1613 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1615 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1617 assertEquals(false, appendEntriesReply.isSuccess());
1618 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1620 MessageCollectorActor.clearMessages(leaderActor);
1624 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1625 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1627 MockRaftActorContext leaderActorContext = createActorContext();
1628 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1629 new FiniteDuration(10000, TimeUnit.SECONDS));
1631 leaderActorContext.setReplicatedLog(
1632 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1633 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1635 leader = new Leader(leaderActorContext);
1636 leaderActor.underlyingActor().setBehavior(leader);
1637 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1639 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1641 assertEquals(false, appendEntriesReply.isSuccess());
1642 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1644 MessageCollectorActor.clearMessages(leaderActor);
1648 public void testHandleAppendEntriesReplySuccess() throws Exception {
1649 logStart("testHandleAppendEntriesReplySuccess");
1651 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1653 leaderActorContext.setReplicatedLog(
1654 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1656 leaderActorContext.setCommitIndex(1);
1657 leaderActorContext.setLastApplied(1);
1658 leaderActorContext.getTermInformation().update(1, "leader");
1660 leader = new Leader(leaderActorContext);
1662 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1664 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1665 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1667 short payloadVersion = 5;
1668 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1670 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1672 assertEquals(RaftState.Leader, raftActorBehavior.state());
1674 assertEquals(2, leaderActorContext.getCommitIndex());
1676 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1677 leaderActor, ApplyJournalEntries.class);
1679 assertEquals(2, leaderActorContext.getLastApplied());
1681 assertEquals(2, applyJournalEntries.getToIndex());
1683 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1686 assertEquals(1,applyStateList.size());
1688 ApplyState applyState = applyStateList.get(0);
1690 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1692 assertEquals(2, followerInfo.getMatchIndex());
1693 assertEquals(3, followerInfo.getNextIndex());
1694 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1695 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1699 public void testHandleAppendEntriesReplyUnknownFollower(){
1700 logStart("testHandleAppendEntriesReplyUnknownFollower");
1702 MockRaftActorContext leaderActorContext = createActorContext();
1704 leader = new Leader(leaderActorContext);
1706 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1708 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1710 assertEquals(RaftState.Leader, raftActorBehavior.state());
1714 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1715 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1717 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1718 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1719 new FiniteDuration(1000, TimeUnit.SECONDS));
1720 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1722 leaderActorContext.setReplicatedLog(
1723 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1724 long leaderCommitIndex = 3;
1725 leaderActorContext.setCommitIndex(leaderCommitIndex);
1726 leaderActorContext.setLastApplied(leaderCommitIndex);
1728 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1729 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1730 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1731 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1733 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1735 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1736 followerActorContext.setCommitIndex(-1);
1737 followerActorContext.setLastApplied(-1);
1739 Follower follower = new Follower(followerActorContext);
1740 followerActor.underlyingActor().setBehavior(follower);
1742 leader = new Leader(leaderActorContext);
1744 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1745 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1747 MessageCollectorActor.clearMessages(followerActor);
1748 MessageCollectorActor.clearMessages(leaderActor);
1750 // Verify initial AppendEntries sent with the leader's current commit index.
1751 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1752 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1753 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1755 leaderActor.underlyingActor().setBehavior(leader);
1757 leader.handleMessage(followerActor, appendEntriesReply);
1759 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1760 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1762 appendEntries = appendEntriesList.get(0);
1763 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1764 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1765 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1767 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1768 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1769 appendEntries.getEntries().get(0).getData());
1770 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1771 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1772 appendEntries.getEntries().get(1).getData());
1774 appendEntries = appendEntriesList.get(1);
1775 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1776 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1777 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1779 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1780 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1781 appendEntries.getEntries().get(0).getData());
1782 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1783 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1784 appendEntries.getEntries().get(1).getData());
1786 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1787 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1789 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1791 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1792 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1796 public void testHandleRequestVoteReply(){
1797 logStart("testHandleRequestVoteReply");
1799 MockRaftActorContext leaderActorContext = createActorContext();
1801 leader = new Leader(leaderActorContext);
1803 // Should be a no-op.
1804 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1805 new RequestVoteReply(1, true));
1807 assertEquals(RaftState.Leader, raftActorBehavior.state());
1809 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1811 assertEquals(RaftState.Leader, raftActorBehavior.state());
1815 public void testIsolatedLeaderCheckNoFollowers() {
1816 logStart("testIsolatedLeaderCheckNoFollowers");
1818 MockRaftActorContext leaderActorContext = createActorContext();
1820 leader = new Leader(leaderActorContext);
1821 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1822 Assert.assertTrue(behavior instanceof Leader);
1825 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1826 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1827 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1829 MockRaftActorContext leaderActorContext = createActorContext();
1831 Map<String, String> peerAddresses = new HashMap<>();
1832 peerAddresses.put("follower-1", followerActor1.path().toString());
1833 peerAddresses.put("follower-2", followerActor2.path().toString());
1835 leaderActorContext.setPeerAddresses(peerAddresses);
1836 leaderActorContext.setRaftPolicy(raftPolicy);
1838 leader = new Leader(leaderActorContext);
1840 leader.markFollowerActive("follower-1");
1841 leader.markFollowerActive("follower-2");
1842 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1843 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1844 behavior instanceof Leader);
1846 // kill 1 follower and verify if that got killed
1847 final JavaTestKit probe = new JavaTestKit(getSystem());
1848 probe.watch(followerActor1);
1849 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1850 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1851 assertEquals(termMsg1.getActor(), followerActor1);
1853 leader.markFollowerInActive("follower-1");
1854 leader.markFollowerActive("follower-2");
1855 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1856 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1857 behavior instanceof Leader);
1859 // kill 2nd follower and leader should change to Isolated leader
1860 followerActor2.tell(PoisonPill.getInstance(), null);
1861 probe.watch(followerActor2);
1862 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1863 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1864 assertEquals(termMsg2.getActor(), followerActor2);
1866 leader.markFollowerInActive("follower-2");
1867 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1871 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1872 logStart("testIsolatedLeaderCheckTwoFollowers");
1874 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1876 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1877 behavior instanceof IsolatedLeader);
1881 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1882 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1884 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1886 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1887 behavior instanceof Leader);
1891 public void testLaggingFollowerStarvation() throws Exception {
1892 logStart("testLaggingFollowerStarvation");
1893 new JavaTestKit(getSystem()) {{
1894 String leaderActorId = actorFactory.generateActorId("leader");
1895 String follower1ActorId = actorFactory.generateActorId("follower");
1896 String follower2ActorId = actorFactory.generateActorId("follower");
1898 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1899 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1900 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1901 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1903 MockRaftActorContext leaderActorContext =
1904 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1906 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1907 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1908 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1910 leaderActorContext.setConfigParams(configParams);
1912 leaderActorContext.setReplicatedLog(
1913 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1915 Map<String, String> peerAddresses = new HashMap<>();
1916 peerAddresses.put(follower1ActorId,
1917 follower1Actor.path().toString());
1918 peerAddresses.put(follower2ActorId,
1919 follower2Actor.path().toString());
1921 leaderActorContext.setPeerAddresses(peerAddresses);
1922 leaderActorContext.getTermInformation().update(1, leaderActorId);
1924 RaftActorBehavior leader = createBehavior(leaderActorContext);
1926 leaderActor.underlyingActor().setBehavior(leader);
1928 for(int i=1;i<6;i++) {
1929 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1930 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1931 assertTrue(newBehavior == leader);
1932 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1935 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1936 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1938 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1939 heartbeats.size() > 1);
1941 // Check if follower-2 got AppendEntries during this time and was not starved
1942 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1944 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1945 appendEntries.size() > 1);
1951 public void testReplicationConsensusWithNonVotingFollower() {
1952 logStart("testReplicationConsensusWithNonVotingFollower");
1954 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1955 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1956 new FiniteDuration(1000, TimeUnit.SECONDS));
1958 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1960 String nonVotingFollowerId = "nonvoting-follower";
1961 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1962 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1964 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1966 leader = new Leader(leaderActorContext);
1968 // Ignore initial heartbeats
1969 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1970 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1972 MessageCollectorActor.clearMessages(followerActor);
1973 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1974 MessageCollectorActor.clearMessages(leaderActor);
1976 // Send a Replicate message and wait for AppendEntries.
1977 sendReplicate(leaderActorContext, 0);
1979 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1980 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1982 // Send reply only from the voting follower and verify consensus via ApplyState.
1983 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1985 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1987 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1989 MessageCollectorActor.clearMessages(followerActor);
1990 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1991 MessageCollectorActor.clearMessages(leaderActor);
1993 // Send another Replicate message
1994 sendReplicate(leaderActorContext, 1);
1996 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1997 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1998 AppendEntries.class);
1999 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2000 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2002 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2003 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2005 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2007 // Send reply from the voting follower and verify consensus.
2008 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2010 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2014 public void testTransferLeadershipWithFollowerInSync() {
2015 logStart("testTransferLeadershipWithFollowerInSync");
2017 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2018 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2019 new FiniteDuration(1000, TimeUnit.SECONDS));
2020 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2022 leader = new Leader(leaderActorContext);
2024 // Initial heartbeat
2025 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2026 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2027 MessageCollectorActor.clearMessages(followerActor);
2029 sendReplicate(leaderActorContext, 0);
2030 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2032 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2033 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2034 MessageCollectorActor.clearMessages(followerActor);
2036 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2037 leader.transferLeadership(mockTransferCohort);
2039 verify(mockTransferCohort, never()).transferComplete();
2040 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2041 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2043 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2044 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2046 // Leader should force an election timeout
2047 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2049 verify(mockTransferCohort).transferComplete();
2053 public void testTransferLeadershipWithEmptyLog() {
2054 logStart("testTransferLeadershipWithEmptyLog");
2056 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2057 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2058 new FiniteDuration(1000, TimeUnit.SECONDS));
2059 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2061 leader = new Leader(leaderActorContext);
2063 // Initial heartbeat
2064 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2065 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2066 MessageCollectorActor.clearMessages(followerActor);
2068 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2069 leader.transferLeadership(mockTransferCohort);
2071 verify(mockTransferCohort, never()).transferComplete();
2072 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2075 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2076 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2078 // Leader should force an election timeout
2079 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2081 verify(mockTransferCohort).transferComplete();
2085 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2086 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2088 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2089 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2090 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2092 leader = new Leader(leaderActorContext);
2094 // Initial heartbeat
2095 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2096 MessageCollectorActor.clearMessages(followerActor);
2098 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2099 leader.transferLeadership(mockTransferCohort);
2101 verify(mockTransferCohort, never()).transferComplete();
2103 // Sync up the follower.
2104 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2105 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2106 MessageCollectorActor.clearMessages(followerActor);
2108 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2109 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2110 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2111 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2114 // Leader should force an election timeout
2115 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2117 verify(mockTransferCohort).transferComplete();
2121 public void testTransferLeadershipWithFollowerSyncTimeout() {
2122 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2124 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2125 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2126 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2127 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2128 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2130 leader = new Leader(leaderActorContext);
2132 // Initial heartbeat
2133 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2135 MessageCollectorActor.clearMessages(followerActor);
2137 sendReplicate(leaderActorContext, 0);
2138 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140 MessageCollectorActor.clearMessages(followerActor);
2142 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2143 leader.transferLeadership(mockTransferCohort);
2145 verify(mockTransferCohort, never()).transferComplete();
2147 // Send heartbeats to time out the transfer.
2148 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2149 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2150 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2151 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2154 verify(mockTransferCohort).abortTransfer();
2155 verify(mockTransferCohort, never()).transferComplete();
2156 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2160 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2161 ActorRef actorRef, RaftRPC rpc) throws Exception {
2162 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2163 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2166 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2168 private final long electionTimeOutIntervalMillis;
2169 private final int snapshotChunkSize;
2171 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2173 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2174 this.snapshotChunkSize = snapshotChunkSize;
2178 public FiniteDuration getElectionTimeOutInterval() {
2179 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2183 public int getSnapshotChunkSize() {
2184 return snapshotChunkSize;