2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.RaftVersions;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
44 import org.opendaylight.controller.cluster.raft.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
68 public class LeaderTest extends AbstractLeaderTest {
70 static final String FOLLOWER_ID = "follower";
71 public static final String LEADER_ID = "leader";
73 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79 private Leader leader;
80 private final short payloadVersion = 5;
84 public void tearDown() throws Exception {
93 public void testHandleMessageForUnknownMessage() throws Exception {
94 logStart("testHandleMessageForUnknownMessage");
96 leader = new Leader(createActorContext());
98 // handle message should return the Leader state when it receives an
100 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
101 Assert.assertTrue(behavior instanceof Leader);
105 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
108 MockRaftActorContext actorContext = createActorContextWithFollower();
109 short payloadVersion = (short)5;
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
117 // Leader should send an immediate heartbeat with no entries as follower is inactive.
118 long lastIndex = actorContext.getReplicatedLog().lastIndex();
119 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120 assertEquals("getTerm", term, appendEntries.getTerm());
121 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
122 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
123 assertEquals("Entries size", 0, appendEntries.getEntries().size());
124 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126 // The follower would normally reply - simulate that explicitly here.
127 leader.handleMessage(followerActor, new AppendEntriesReply(
128 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
129 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131 followerActor.underlyingActor().clear();
133 // Sleep for the heartbeat interval so AppendEntries is sent.
134 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
135 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137 leader.handleMessage(leaderActor, new SendHeartBeat());
139 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
140 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
141 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
142 assertEquals("Entries size", 1, appendEntries.getEntries().size());
143 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
144 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
145 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
149 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
150 return sendReplicate(actorContext, 1, index);
153 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
154 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156 term, index, payload);
157 actorContext.getReplicatedLog().append(newEntry);
158 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
162 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165 MockRaftActorContext actorContext = createActorContextWithFollower();
168 actorContext.getTermInformation().update(term, "");
170 leader = new Leader(actorContext);
172 // Leader will send an immediate heartbeat - ignore it.
173 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175 // The follower would normally reply - simulate that explicitly here.
176 long lastIndex = actorContext.getReplicatedLog().lastIndex();
177 leader.handleMessage(followerActor, new AppendEntriesReply(
178 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181 followerActor.underlyingActor().clear();
183 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185 // State should not change
186 assertTrue(raftBehavior instanceof Leader);
188 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191 assertEquals("Entries size", 1, appendEntries.getEntries().size());
192 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
199 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202 MockRaftActorContext actorContext = createActorContextWithFollower();
204 // The raft context is initialized with a couple log entries. However the commitIndex
205 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
206 // committed and applied. Now it regains leadership with a higher term (2).
207 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
208 long newTerm = prevTerm + 1;
209 actorContext.getTermInformation().update(newTerm, "");
211 leader = new Leader(actorContext);
213 // Leader will send an immediate heartbeat - ignore it.
214 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
216 // The follower replies with the leader's current last index and term, simulating that it is
217 // up to date with the leader.
218 long lastIndex = actorContext.getReplicatedLog().lastIndex();
219 leader.handleMessage(followerActor, new AppendEntriesReply(
220 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
222 // The commit index should not get updated even though consensus was reached. This is b/c the
223 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
224 // from previous terms by counting replicas".
225 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
227 followerActor.underlyingActor().clear();
229 // Now replicate a new entry with the new term 2.
230 long newIndex = lastIndex + 1;
231 sendReplicate(actorContext, newTerm, newIndex);
233 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
235 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
236 assertEquals("Entries size", 1, appendEntries.getEntries().size());
237 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
238 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
239 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
241 // The follower replies with success. The leader should now update the commit index to the new index
242 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
243 // prior entries are committed indirectly".
244 leader.handleMessage(followerActor, new AppendEntriesReply(
245 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
247 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
251 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
252 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
254 MockRaftActorContext actorContext = createActorContextWithFollower();
255 actorContext.setRaftPolicy(createRaftPolicy(true, true));
258 actorContext.getTermInformation().update(term, "");
260 leader = new Leader(actorContext);
262 // Leader will send an immediate heartbeat - ignore it.
263 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
265 // The follower would normally reply - simulate that explicitly here.
266 long lastIndex = actorContext.getReplicatedLog().lastIndex();
267 leader.handleMessage(followerActor, new AppendEntriesReply(
268 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
269 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
271 followerActor.underlyingActor().clear();
273 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
275 // State should not change
276 assertTrue(raftBehavior instanceof Leader);
278 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
279 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
280 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
281 assertEquals("Entries size", 1, appendEntries.getEntries().size());
282 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
283 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
284 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
285 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
289 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
290 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
292 MockRaftActorContext actorContext = createActorContextWithFollower();
293 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
295 public FiniteDuration getHeartBeatInterval() {
296 return FiniteDuration.apply(5, TimeUnit.SECONDS);
301 actorContext.getTermInformation().update(term, "");
303 leader = new Leader(actorContext);
305 // Leader will send an immediate heartbeat - ignore it.
306 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
308 // The follower would normally reply - simulate that explicitly here.
309 long lastIndex = actorContext.getReplicatedLog().lastIndex();
310 leader.handleMessage(followerActor, new AppendEntriesReply(
311 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
312 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
314 followerActor.underlyingActor().clear();
316 for(int i=0;i<5;i++) {
317 sendReplicate(actorContext, lastIndex+i+1);
320 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
321 // We expect only 1 message to be sent because of two reasons,
322 // - an append entries reply was not received
323 // - the heartbeat interval has not expired
324 // In this scenario if multiple messages are sent they would likely be duplicates
325 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
329 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
330 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
332 MockRaftActorContext actorContext = createActorContextWithFollower();
333 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
335 public FiniteDuration getHeartBeatInterval() {
336 return FiniteDuration.apply(5, TimeUnit.SECONDS);
341 actorContext.getTermInformation().update(term, "");
343 leader = new Leader(actorContext);
345 // Leader will send an immediate heartbeat - ignore it.
346 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
348 // The follower would normally reply - simulate that explicitly here.
349 long lastIndex = actorContext.getReplicatedLog().lastIndex();
350 leader.handleMessage(followerActor, new AppendEntriesReply(
351 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
352 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
354 followerActor.underlyingActor().clear();
356 for(int i=0;i<3;i++) {
357 sendReplicate(actorContext, lastIndex+i+1);
358 leader.handleMessage(followerActor, new AppendEntriesReply(
359 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
363 for(int i=3;i<5;i++) {
364 sendReplicate(actorContext, lastIndex + i + 1);
367 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
368 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
369 // get sent to the follower - but not the 5th
370 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
372 for(int i=0;i<4;i++) {
373 long expected = allMessages.get(i).getEntries().get(0).getIndex();
374 assertEquals(expected, i+2);
379 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
380 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
382 MockRaftActorContext actorContext = createActorContextWithFollower();
383 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
385 public FiniteDuration getHeartBeatInterval() {
386 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
391 actorContext.getTermInformation().update(term, "");
393 leader = new Leader(actorContext);
395 // Leader will send an immediate heartbeat - ignore it.
396 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
398 // The follower would normally reply - simulate that explicitly here.
399 long lastIndex = actorContext.getReplicatedLog().lastIndex();
400 leader.handleMessage(followerActor, new AppendEntriesReply(
401 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
402 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
404 followerActor.underlyingActor().clear();
406 sendReplicate(actorContext, lastIndex+1);
408 // Wait slightly longer than heartbeat duration
409 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
411 leader.handleMessage(leaderActor, new SendHeartBeat());
413 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
414 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
416 assertEquals(1, allMessages.get(0).getEntries().size());
417 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
418 assertEquals(1, allMessages.get(1).getEntries().size());
419 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
424 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
425 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
427 MockRaftActorContext actorContext = createActorContextWithFollower();
428 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
430 public FiniteDuration getHeartBeatInterval() {
431 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
436 actorContext.getTermInformation().update(term, "");
438 leader = new Leader(actorContext);
440 // Leader will send an immediate heartbeat - ignore it.
441 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
443 // The follower would normally reply - simulate that explicitly here.
444 long lastIndex = actorContext.getReplicatedLog().lastIndex();
445 leader.handleMessage(followerActor, new AppendEntriesReply(
446 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
447 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
449 followerActor.underlyingActor().clear();
451 for(int i=0;i<3;i++) {
452 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
453 leader.handleMessage(leaderActor, new SendHeartBeat());
456 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
457 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
461 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
462 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
464 MockRaftActorContext actorContext = createActorContextWithFollower();
465 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
467 public FiniteDuration getHeartBeatInterval() {
468 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473 actorContext.getTermInformation().update(term, "");
475 leader = new Leader(actorContext);
477 // Leader will send an immediate heartbeat - ignore it.
478 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480 // The follower would normally reply - simulate that explicitly here.
481 long lastIndex = actorContext.getReplicatedLog().lastIndex();
482 leader.handleMessage(followerActor, new AppendEntriesReply(
483 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
484 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
486 followerActor.underlyingActor().clear();
488 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
489 leader.handleMessage(leaderActor, new SendHeartBeat());
490 sendReplicate(actorContext, lastIndex+1);
492 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
493 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
495 assertEquals(0, allMessages.get(0).getEntries().size());
496 assertEquals(1, allMessages.get(1).getEntries().size());
501 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
502 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
504 MockRaftActorContext actorContext = createActorContext();
506 leader = new Leader(actorContext);
508 actorContext.setLastApplied(0);
510 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
511 long term = actorContext.getTermInformation().getCurrentTerm();
512 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
513 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
515 actorContext.getReplicatedLog().append(newEntry);
517 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
518 new Replicate(leaderActor, "state-id", newEntry));
520 // State should not change
521 assertTrue(raftBehavior instanceof Leader);
523 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
525 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
526 // one since lastApplied state is 0.
527 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
528 leaderActor, ApplyState.class);
529 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
531 for(int i = 0; i <= newLogIndex - 1; i++ ) {
532 ApplyState applyState = applyStateList.get(i);
533 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
534 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
537 ApplyState last = applyStateList.get((int) newLogIndex - 1);
538 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
539 assertEquals("getIdentifier", "state-id", last.getIdentifier());
543 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
544 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
546 MockRaftActorContext actorContext = createActorContextWithFollower();
548 Map<String, String> leadersSnapshot = new HashMap<>();
549 leadersSnapshot.put("1", "A");
550 leadersSnapshot.put("2", "B");
551 leadersSnapshot.put("3", "C");
554 actorContext.getReplicatedLog().removeFrom(0);
556 final int commitIndex = 3;
557 final int snapshotIndex = 2;
558 final int newEntryIndex = 4;
559 final int snapshotTerm = 1;
560 final int currentTerm = 2;
562 // set the snapshot variables in replicatedlog
563 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565 actorContext.setCommitIndex(commitIndex);
566 //set follower timeout to 2 mins, helps during debugging
567 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
569 leader = new Leader(actorContext);
571 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
572 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
575 ReplicatedLogImplEntry entry =
576 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577 new MockRaftActorContext.MockPayload("D"));
579 //update follower timestamp
580 leader.markFollowerActive(FOLLOWER_ID);
582 ByteString bs = toByteString(leadersSnapshot);
583 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
584 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
585 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
586 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
588 //send first chunk and no InstallSnapshotReply received yet
590 fts.incrementChunkIndex();
592 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
593 TimeUnit.MILLISECONDS);
595 leader.handleMessage(leaderActor, new SendHeartBeat());
597 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
599 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
601 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
603 //InstallSnapshotReply received
604 fts.markSendStatus(true);
606 leader.handleMessage(leaderActor, new SendHeartBeat());
608 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
610 assertEquals(commitIndex, is.getLastIncludedIndex());
614 public void testSendAppendEntriesSnapshotScenario() throws Exception {
615 logStart("testSendAppendEntriesSnapshotScenario");
617 MockRaftActorContext actorContext = createActorContextWithFollower();
619 Map<String, String> leadersSnapshot = new HashMap<>();
620 leadersSnapshot.put("1", "A");
621 leadersSnapshot.put("2", "B");
622 leadersSnapshot.put("3", "C");
625 actorContext.getReplicatedLog().removeFrom(0);
627 final int followersLastIndex = 2;
628 final int snapshotIndex = 3;
629 final int newEntryIndex = 4;
630 final int snapshotTerm = 1;
631 final int currentTerm = 2;
633 // set the snapshot variables in replicatedlog
634 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
635 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
636 actorContext.setCommitIndex(followersLastIndex);
638 leader = new Leader(actorContext);
640 // Leader will send an immediate heartbeat - ignore it.
641 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
644 ReplicatedLogImplEntry entry =
645 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
646 new MockRaftActorContext.MockPayload("D"));
648 actorContext.getReplicatedLog().append(entry);
650 //update follower timestamp
651 leader.markFollowerActive(FOLLOWER_ID);
653 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
654 RaftActorBehavior raftBehavior = leader.handleMessage(
655 leaderActor, new Replicate(null, "state-id", entry));
657 assertTrue(raftBehavior instanceof Leader);
659 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
663 public void testInitiateInstallSnapshot() throws Exception {
664 logStart("testInitiateInstallSnapshot");
666 MockRaftActorContext actorContext = createActorContextWithFollower();
669 actorContext.getReplicatedLog().removeFrom(0);
671 final int followersLastIndex = 2;
672 final int snapshotIndex = 3;
673 final int newEntryIndex = 4;
674 final int snapshotTerm = 1;
675 final int currentTerm = 2;
677 // set the snapshot variables in replicatedlog
678 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680 actorContext.setLastApplied(3);
681 actorContext.setCommitIndex(followersLastIndex);
683 leader = new Leader(actorContext);
685 // Leader will send an immediate heartbeat - ignore it.
686 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688 // set the snapshot as absent and check if capture-snapshot is invoked.
689 leader.setSnapshot(null);
692 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
693 new MockRaftActorContext.MockPayload("D"));
695 actorContext.getReplicatedLog().append(entry);
697 //update follower timestamp
698 leader.markFollowerActive(FOLLOWER_ID);
700 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
702 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
704 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
706 assertTrue(cs.isInstallSnapshotInitiated());
707 assertEquals(3, cs.getLastAppliedIndex());
708 assertEquals(1, cs.getLastAppliedTerm());
709 assertEquals(4, cs.getLastIndex());
710 assertEquals(2, cs.getLastTerm());
712 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
715 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
719 public void testInitiateForceInstallSnapshot() throws Exception {
720 logStart("testInitiateForceInstallSnapshot");
722 MockRaftActorContext actorContext = createActorContextWithFollower();
724 final int followersLastIndex = 2;
725 final int snapshotIndex = -1;
726 final int newEntryIndex = 4;
727 final int snapshotTerm = -1;
728 final int currentTerm = 2;
730 // set the snapshot variables in replicatedlog
731 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733 actorContext.setLastApplied(3);
734 actorContext.setCommitIndex(followersLastIndex);
736 actorContext.getReplicatedLog().removeFrom(0);
738 leader = new Leader(actorContext);
740 // Leader will send an immediate heartbeat - ignore it.
741 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
743 // set the snapshot as absent and check if capture-snapshot is invoked.
744 leader.setSnapshot(null);
746 for(int i=0;i<4;i++) {
747 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
748 new MockRaftActorContext.MockPayload("X" + i)));
752 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
753 new MockRaftActorContext.MockPayload("D"));
755 actorContext.getReplicatedLog().append(entry);
757 //update follower timestamp
758 leader.markFollowerActive(FOLLOWER_ID);
760 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
761 // installed with a SendInstallSnapshot
762 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
764 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
766 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
768 assertTrue(cs.isInstallSnapshotInitiated());
769 assertEquals(3, cs.getLastAppliedIndex());
770 assertEquals(1, cs.getLastAppliedTerm());
771 assertEquals(4, cs.getLastIndex());
772 assertEquals(2, cs.getLastTerm());
774 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
775 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
777 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
782 public void testInstallSnapshot() throws Exception {
783 logStart("testInstallSnapshot");
785 MockRaftActorContext actorContext = createActorContextWithFollower();
787 Map<String, String> leadersSnapshot = new HashMap<>();
788 leadersSnapshot.put("1", "A");
789 leadersSnapshot.put("2", "B");
790 leadersSnapshot.put("3", "C");
793 actorContext.getReplicatedLog().removeFrom(0);
795 final int lastAppliedIndex = 3;
796 final int snapshotIndex = 2;
797 final int snapshotTerm = 1;
798 final int currentTerm = 2;
800 // set the snapshot variables in replicatedlog
801 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804 actorContext.setCommitIndex(lastAppliedIndex);
805 actorContext.setLastApplied(lastAppliedIndex);
807 leader = new Leader(actorContext);
809 // Initial heartbeat.
810 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
816 Collections.<ReplicatedLogEntry>emptyList(),
817 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
819 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
821 assertTrue(raftBehavior instanceof Leader);
823 // check if installsnapshot gets called with the correct values.
825 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
827 assertNotNull(installSnapshot.getData());
828 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
829 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
831 assertEquals(currentTerm, installSnapshot.getTerm());
835 public void testForceInstallSnapshot() throws Exception {
836 logStart("testForceInstallSnapshot");
838 MockRaftActorContext actorContext = createActorContextWithFollower();
840 Map<String, String> leadersSnapshot = new HashMap<>();
841 leadersSnapshot.put("1", "A");
842 leadersSnapshot.put("2", "B");
843 leadersSnapshot.put("3", "C");
845 final int lastAppliedIndex = 3;
846 final int snapshotIndex = -1;
847 final int snapshotTerm = -1;
848 final int currentTerm = 2;
850 // set the snapshot variables in replicatedlog
851 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
852 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
853 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
854 actorContext.setCommitIndex(lastAppliedIndex);
855 actorContext.setLastApplied(lastAppliedIndex);
857 leader = new Leader(actorContext);
859 // Initial heartbeat.
860 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
862 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
863 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
865 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
866 Collections.<ReplicatedLogEntry>emptyList(),
867 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
869 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
871 assertTrue(raftBehavior instanceof Leader);
873 // check if installsnapshot gets called with the correct values.
875 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877 assertNotNull(installSnapshot.getData());
878 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
881 assertEquals(currentTerm, installSnapshot.getTerm());
885 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886 logStart("testHandleInstallSnapshotReplyLastChunk");
888 MockRaftActorContext actorContext = createActorContextWithFollower();
890 final int commitIndex = 3;
891 final int snapshotIndex = 2;
892 final int snapshotTerm = 1;
893 final int currentTerm = 2;
895 actorContext.setCommitIndex(commitIndex);
897 leader = new Leader(actorContext);
899 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
900 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
902 // Ignore initial heartbeat.
903 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
905 Map<String, String> leadersSnapshot = new HashMap<>();
906 leadersSnapshot.put("1", "A");
907 leadersSnapshot.put("2", "B");
908 leadersSnapshot.put("3", "C");
910 // set the snapshot variables in replicatedlog
912 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
913 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
914 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916 ByteString bs = toByteString(leadersSnapshot);
917 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
918 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
919 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
920 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
921 while(!fts.isLastChunk(fts.getChunkIndex())) {
923 fts.incrementChunkIndex();
927 actorContext.getReplicatedLog().removeFrom(0);
929 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
930 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
932 assertTrue(raftBehavior instanceof Leader);
934 assertEquals(0, leader.followerSnapshotSize());
935 assertEquals(1, leader.followerLogSize());
936 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
938 assertEquals(commitIndex, fli.getMatchIndex());
939 assertEquals(commitIndex + 1, fli.getNextIndex());
943 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
944 logStart("testSendSnapshotfromInstallSnapshotReply");
946 MockRaftActorContext actorContext = createActorContextWithFollower();
948 final int commitIndex = 3;
949 final int snapshotIndex = 2;
950 final int snapshotTerm = 1;
951 final int currentTerm = 2;
953 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
955 public int getSnapshotChunkSize() {
959 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
960 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
962 actorContext.setConfigParams(configParams);
963 actorContext.setCommitIndex(commitIndex);
965 leader = new Leader(actorContext);
967 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
968 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
970 Map<String, String> leadersSnapshot = new HashMap<>();
971 leadersSnapshot.put("1", "A");
972 leadersSnapshot.put("2", "B");
973 leadersSnapshot.put("3", "C");
975 // set the snapshot variables in replicatedlog
976 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
977 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
978 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
980 ByteString bs = toByteString(leadersSnapshot);
981 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
982 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
983 leader.setSnapshot(snapshot);
985 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
987 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
989 assertEquals(1, installSnapshot.getChunkIndex());
990 assertEquals(3, installSnapshot.getTotalChunks());
992 followerActor.underlyingActor().clear();
993 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
994 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
996 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
998 assertEquals(2, installSnapshot.getChunkIndex());
999 assertEquals(3, installSnapshot.getTotalChunks());
1001 followerActor.underlyingActor().clear();
1002 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1003 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1005 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1007 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1008 followerActor.underlyingActor().clear();
1009 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1010 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1012 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1014 Assert.assertNull(installSnapshot);
1019 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1020 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1022 MockRaftActorContext actorContext = createActorContextWithFollower();
1024 final int commitIndex = 3;
1025 final int snapshotIndex = 2;
1026 final int snapshotTerm = 1;
1027 final int currentTerm = 2;
1029 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1031 public int getSnapshotChunkSize() {
1036 actorContext.setCommitIndex(commitIndex);
1038 leader = new Leader(actorContext);
1040 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1041 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1043 Map<String, String> leadersSnapshot = new HashMap<>();
1044 leadersSnapshot.put("1", "A");
1045 leadersSnapshot.put("2", "B");
1046 leadersSnapshot.put("3", "C");
1048 // set the snapshot variables in replicatedlog
1049 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1050 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1051 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1053 ByteString bs = toByteString(leadersSnapshot);
1054 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1055 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1056 leader.setSnapshot(snapshot);
1058 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1059 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1061 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1063 assertEquals(1, installSnapshot.getChunkIndex());
1064 assertEquals(3, installSnapshot.getTotalChunks());
1066 followerActor.underlyingActor().clear();
1068 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1069 FOLLOWER_ID, -1, false));
1071 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1072 TimeUnit.MILLISECONDS);
1074 leader.handleMessage(leaderActor, new SendHeartBeat());
1076 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1078 assertEquals(1, installSnapshot.getChunkIndex());
1079 assertEquals(3, installSnapshot.getTotalChunks());
1083 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1084 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1086 MockRaftActorContext actorContext = createActorContextWithFollower();
1088 final int commitIndex = 3;
1089 final int snapshotIndex = 2;
1090 final int snapshotTerm = 1;
1091 final int currentTerm = 2;
1093 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1095 public int getSnapshotChunkSize() {
1100 actorContext.setCommitIndex(commitIndex);
1102 leader = new Leader(actorContext);
1104 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1105 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1107 Map<String, String> leadersSnapshot = new HashMap<>();
1108 leadersSnapshot.put("1", "A");
1109 leadersSnapshot.put("2", "B");
1110 leadersSnapshot.put("3", "C");
1112 // set the snapshot variables in replicatedlog
1113 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1114 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1115 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1117 ByteString bs = toByteString(leadersSnapshot);
1118 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1119 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1120 leader.setSnapshot(snapshot);
1122 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1124 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1126 assertEquals(1, installSnapshot.getChunkIndex());
1127 assertEquals(3, installSnapshot.getTotalChunks());
1128 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1130 int hashCode = Arrays.hashCode(installSnapshot.getData());
1132 followerActor.underlyingActor().clear();
1134 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1135 FOLLOWER_ID, 1, true));
1137 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1139 assertEquals(2, installSnapshot.getChunkIndex());
1140 assertEquals(3, installSnapshot.getTotalChunks());
1141 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1145 public void testFollowerToSnapshotLogic() {
1146 logStart("testFollowerToSnapshotLogic");
1148 MockRaftActorContext actorContext = createActorContext();
1150 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1152 public int getSnapshotChunkSize() {
1157 leader = new Leader(actorContext);
1159 Map<String, String> leadersSnapshot = new HashMap<>();
1160 leadersSnapshot.put("1", "A");
1161 leadersSnapshot.put("2", "B");
1162 leadersSnapshot.put("3", "C");
1164 ByteString bs = toByteString(leadersSnapshot);
1165 byte[] barray = bs.toByteArray();
1167 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1168 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1170 assertEquals(bs.size(), barray.length);
1173 for (int i=0; i < barray.length; i = i + 50) {
1177 if (i + 50 > barray.length) {
1181 byte[] chunk = fts.getNextChunk();
1182 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1183 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1185 fts.markSendStatus(true);
1186 if (!fts.isLastChunk(chunkIndex)) {
1187 fts.incrementChunkIndex();
1191 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1194 @Override protected RaftActorBehavior createBehavior(
1195 RaftActorContext actorContext) {
1196 return new Leader(actorContext);
1200 protected MockRaftActorContext createActorContext() {
1201 return createActorContext(leaderActor);
1205 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1206 return createActorContext(LEADER_ID, actorRef);
1209 private MockRaftActorContext createActorContextWithFollower() {
1210 MockRaftActorContext actorContext = createActorContext();
1211 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1212 followerActor.path().toString()).build());
1213 return actorContext;
1216 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219 configParams.setElectionTimeoutFactor(100000);
1220 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221 context.setConfigParams(configParams);
1222 context.setPayloadVersion(payloadVersion);
1226 private MockRaftActorContext createFollowerActorContextWithLeader() {
1227 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1228 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1229 followerConfig.setElectionTimeoutFactor(10000);
1230 followerActorContext.setConfigParams(followerConfig);
1231 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1232 return followerActorContext;
1236 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1237 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1239 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1241 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1243 Follower follower = new Follower(followerActorContext);
1244 followerActor.underlyingActor().setBehavior(follower);
1246 Map<String, String> peerAddresses = new HashMap<>();
1247 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1249 leaderActorContext.setPeerAddresses(peerAddresses);
1251 leaderActorContext.getReplicatedLog().removeFrom(0);
1254 leaderActorContext.setReplicatedLog(
1255 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1257 leaderActorContext.setCommitIndex(1);
1259 followerActorContext.getReplicatedLog().removeFrom(0);
1261 // follower too has the exact same log entries and has the same commit index
1262 followerActorContext.setReplicatedLog(
1263 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1265 followerActorContext.setCommitIndex(1);
1267 leader = new Leader(leaderActorContext);
1269 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1271 assertEquals(1, appendEntries.getLeaderCommit());
1272 assertEquals(0, appendEntries.getEntries().size());
1273 assertEquals(0, appendEntries.getPrevLogIndex());
1275 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1276 leaderActor, AppendEntriesReply.class);
1278 assertEquals(2, appendEntriesReply.getLogLastIndex());
1279 assertEquals(1, appendEntriesReply.getLogLastTerm());
1281 // follower returns its next index
1282 assertEquals(2, appendEntriesReply.getLogLastIndex());
1283 assertEquals(1, appendEntriesReply.getLogLastTerm());
1289 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1290 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1292 MockRaftActorContext leaderActorContext = createActorContext();
1294 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1297 Follower follower = new Follower(followerActorContext);
1298 followerActor.underlyingActor().setBehavior(follower);
1300 Map<String, String> leaderPeerAddresses = new HashMap<>();
1301 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1303 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1305 leaderActorContext.getReplicatedLog().removeFrom(0);
1307 leaderActorContext.setReplicatedLog(
1308 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1310 leaderActorContext.setCommitIndex(1);
1312 followerActorContext.getReplicatedLog().removeFrom(0);
1314 followerActorContext.setReplicatedLog(
1315 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1317 // follower has the same log entries but its commit index > leaders commit index
1318 followerActorContext.setCommitIndex(2);
1320 leader = new Leader(leaderActorContext);
1322 // Initial heartbeat
1323 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325 assertEquals(1, appendEntries.getLeaderCommit());
1326 assertEquals(0, appendEntries.getEntries().size());
1327 assertEquals(0, appendEntries.getPrevLogIndex());
1329 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1330 leaderActor, AppendEntriesReply.class);
1332 assertEquals(2, appendEntriesReply.getLogLastIndex());
1333 assertEquals(1, appendEntriesReply.getLogLastTerm());
1335 leaderActor.underlyingActor().setBehavior(follower);
1336 leader.handleMessage(followerActor, appendEntriesReply);
1338 leaderActor.underlyingActor().clear();
1339 followerActor.underlyingActor().clear();
1341 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1342 TimeUnit.MILLISECONDS);
1344 leader.handleMessage(leaderActor, new SendHeartBeat());
1346 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1348 assertEquals(2, appendEntries.getLeaderCommit());
1349 assertEquals(0, appendEntries.getEntries().size());
1350 assertEquals(2, appendEntries.getPrevLogIndex());
1352 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1354 assertEquals(2, appendEntriesReply.getLogLastIndex());
1355 assertEquals(1, appendEntriesReply.getLogLastTerm());
1357 assertEquals(2, followerActorContext.getCommitIndex());
1363 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1364 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1366 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1367 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1368 new FiniteDuration(1000, TimeUnit.SECONDS));
1370 leaderActorContext.setReplicatedLog(
1371 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1372 long leaderCommitIndex = 2;
1373 leaderActorContext.setCommitIndex(leaderCommitIndex);
1374 leaderActorContext.setLastApplied(leaderCommitIndex);
1376 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1377 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1379 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1381 followerActorContext.setReplicatedLog(
1382 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1383 followerActorContext.setCommitIndex(0);
1384 followerActorContext.setLastApplied(0);
1386 Follower follower = new Follower(followerActorContext);
1387 followerActor.underlyingActor().setBehavior(follower);
1389 leader = new Leader(leaderActorContext);
1391 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1394 MessageCollectorActor.clearMessages(followerActor);
1395 MessageCollectorActor.clearMessages(leaderActor);
1397 // Verify initial AppendEntries sent with the leader's current commit index.
1398 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1399 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1400 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1402 leaderActor.underlyingActor().setBehavior(leader);
1404 leader.handleMessage(followerActor, appendEntriesReply);
1406 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1407 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1409 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1410 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1411 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1413 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1414 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1415 appendEntries.getEntries().get(0).getData());
1416 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1417 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1418 appendEntries.getEntries().get(1).getData());
1420 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1421 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1423 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1425 ApplyState applyState = applyStateList.get(0);
1426 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1427 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1428 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1429 applyState.getReplicatedLogEntry().getData());
1431 applyState = applyStateList.get(1);
1432 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1433 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1434 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1435 applyState.getReplicatedLogEntry().getData());
1437 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1438 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1442 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1443 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1445 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1446 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1447 new FiniteDuration(1000, TimeUnit.SECONDS));
1449 leaderActorContext.setReplicatedLog(
1450 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1451 long leaderCommitIndex = 1;
1452 leaderActorContext.setCommitIndex(leaderCommitIndex);
1453 leaderActorContext.setLastApplied(leaderCommitIndex);
1455 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1456 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1458 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1460 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1461 followerActorContext.setCommitIndex(-1);
1462 followerActorContext.setLastApplied(-1);
1464 Follower follower = new Follower(followerActorContext);
1465 followerActor.underlyingActor().setBehavior(follower);
1467 leader = new Leader(leaderActorContext);
1469 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1470 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1472 MessageCollectorActor.clearMessages(followerActor);
1473 MessageCollectorActor.clearMessages(leaderActor);
1475 // Verify initial AppendEntries sent with the leader's current commit index.
1476 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1477 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1478 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1480 leaderActor.underlyingActor().setBehavior(leader);
1482 leader.handleMessage(followerActor, appendEntriesReply);
1484 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1485 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1487 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1488 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1489 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1491 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1492 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1493 appendEntries.getEntries().get(0).getData());
1494 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1495 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1496 appendEntries.getEntries().get(1).getData());
1498 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1499 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1501 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1503 ApplyState applyState = applyStateList.get(0);
1504 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1505 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1506 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1507 applyState.getReplicatedLogEntry().getData());
1509 applyState = applyStateList.get(1);
1510 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1511 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1512 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1513 applyState.getReplicatedLogEntry().getData());
1515 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1516 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1520 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1521 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1523 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1524 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1525 new FiniteDuration(1000, TimeUnit.SECONDS));
1527 leaderActorContext.setReplicatedLog(
1528 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1529 long leaderCommitIndex = 1;
1530 leaderActorContext.setCommitIndex(leaderCommitIndex);
1531 leaderActorContext.setLastApplied(leaderCommitIndex);
1533 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1534 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1536 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1538 followerActorContext.setReplicatedLog(
1539 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1540 followerActorContext.setCommitIndex(-1);
1541 followerActorContext.setLastApplied(-1);
1543 Follower follower = new Follower(followerActorContext);
1544 followerActor.underlyingActor().setBehavior(follower);
1546 leader = new Leader(leaderActorContext);
1548 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1549 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1551 MessageCollectorActor.clearMessages(followerActor);
1552 MessageCollectorActor.clearMessages(leaderActor);
1554 // Verify initial AppendEntries sent with the leader's current commit index.
1555 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1556 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1557 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1559 leaderActor.underlyingActor().setBehavior(leader);
1561 leader.handleMessage(followerActor, appendEntriesReply);
1563 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1564 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1566 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1567 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1568 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1570 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1571 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1572 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1573 appendEntries.getEntries().get(0).getData());
1574 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1575 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1576 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1577 appendEntries.getEntries().get(1).getData());
1579 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1580 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1582 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1584 ApplyState applyState = applyStateList.get(0);
1585 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1586 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1587 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1588 applyState.getReplicatedLogEntry().getData());
1590 applyState = applyStateList.get(1);
1591 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1592 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1593 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1594 applyState.getReplicatedLogEntry().getData());
1596 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1597 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1598 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1602 public void testHandleAppendEntriesReplyWithNewerTerm(){
1603 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1605 MockRaftActorContext leaderActorContext = createActorContext();
1606 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1607 new FiniteDuration(10000, TimeUnit.SECONDS));
1609 leaderActorContext.setReplicatedLog(
1610 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1612 leader = new Leader(leaderActorContext);
1613 leaderActor.underlyingActor().setBehavior(leader);
1614 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1616 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1618 assertEquals(false, appendEntriesReply.isSuccess());
1619 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1621 MessageCollectorActor.clearMessages(leaderActor);
1625 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1626 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1628 MockRaftActorContext leaderActorContext = createActorContext();
1629 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1630 new FiniteDuration(10000, TimeUnit.SECONDS));
1632 leaderActorContext.setReplicatedLog(
1633 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1634 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1636 leader = new Leader(leaderActorContext);
1637 leaderActor.underlyingActor().setBehavior(leader);
1638 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1640 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1642 assertEquals(false, appendEntriesReply.isSuccess());
1643 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1645 MessageCollectorActor.clearMessages(leaderActor);
1649 public void testHandleAppendEntriesReplySuccess() throws Exception {
1650 logStart("testHandleAppendEntriesReplySuccess");
1652 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1654 leaderActorContext.setReplicatedLog(
1655 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1657 leaderActorContext.setCommitIndex(1);
1658 leaderActorContext.setLastApplied(1);
1659 leaderActorContext.getTermInformation().update(1, "leader");
1661 leader = new Leader(leaderActorContext);
1663 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1665 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1666 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1668 short payloadVersion = 5;
1669 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1671 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1673 assertEquals(RaftState.Leader, raftActorBehavior.state());
1675 assertEquals(2, leaderActorContext.getCommitIndex());
1677 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1678 leaderActor, ApplyJournalEntries.class);
1680 assertEquals(2, leaderActorContext.getLastApplied());
1682 assertEquals(2, applyJournalEntries.getToIndex());
1684 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1687 assertEquals(1,applyStateList.size());
1689 ApplyState applyState = applyStateList.get(0);
1691 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1693 assertEquals(2, followerInfo.getMatchIndex());
1694 assertEquals(3, followerInfo.getNextIndex());
1695 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1696 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1700 public void testHandleAppendEntriesReplyUnknownFollower(){
1701 logStart("testHandleAppendEntriesReplyUnknownFollower");
1703 MockRaftActorContext leaderActorContext = createActorContext();
1705 leader = new Leader(leaderActorContext);
1707 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1709 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1711 assertEquals(RaftState.Leader, raftActorBehavior.state());
1715 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1716 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1718 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1719 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1720 new FiniteDuration(1000, TimeUnit.SECONDS));
1721 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1723 leaderActorContext.setReplicatedLog(
1724 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1725 long leaderCommitIndex = 3;
1726 leaderActorContext.setCommitIndex(leaderCommitIndex);
1727 leaderActorContext.setLastApplied(leaderCommitIndex);
1729 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1730 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1731 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1732 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1734 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1736 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1737 followerActorContext.setCommitIndex(-1);
1738 followerActorContext.setLastApplied(-1);
1740 Follower follower = new Follower(followerActorContext);
1741 followerActor.underlyingActor().setBehavior(follower);
1743 leader = new Leader(leaderActorContext);
1745 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1746 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1748 MessageCollectorActor.clearMessages(followerActor);
1749 MessageCollectorActor.clearMessages(leaderActor);
1751 // Verify initial AppendEntries sent with the leader's current commit index.
1752 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1753 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1754 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1756 leaderActor.underlyingActor().setBehavior(leader);
1758 leader.handleMessage(followerActor, appendEntriesReply);
1760 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1761 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1763 appendEntries = appendEntriesList.get(0);
1764 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1765 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1766 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1768 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1769 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1770 appendEntries.getEntries().get(0).getData());
1771 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1772 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1773 appendEntries.getEntries().get(1).getData());
1775 appendEntries = appendEntriesList.get(1);
1776 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1777 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1778 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1780 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1781 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1782 appendEntries.getEntries().get(0).getData());
1783 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1784 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1785 appendEntries.getEntries().get(1).getData());
1787 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1788 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1790 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1792 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1793 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1797 public void testHandleRequestVoteReply(){
1798 logStart("testHandleRequestVoteReply");
1800 MockRaftActorContext leaderActorContext = createActorContext();
1802 leader = new Leader(leaderActorContext);
1804 // Should be a no-op.
1805 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1806 new RequestVoteReply(1, true));
1808 assertEquals(RaftState.Leader, raftActorBehavior.state());
1810 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1812 assertEquals(RaftState.Leader, raftActorBehavior.state());
1816 public void testIsolatedLeaderCheckNoFollowers() {
1817 logStart("testIsolatedLeaderCheckNoFollowers");
1819 MockRaftActorContext leaderActorContext = createActorContext();
1821 leader = new Leader(leaderActorContext);
1822 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1823 Assert.assertTrue(behavior instanceof Leader);
1826 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1827 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1828 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1830 MockRaftActorContext leaderActorContext = createActorContext();
1832 Map<String, String> peerAddresses = new HashMap<>();
1833 peerAddresses.put("follower-1", followerActor1.path().toString());
1834 peerAddresses.put("follower-2", followerActor2.path().toString());
1836 leaderActorContext.setPeerAddresses(peerAddresses);
1837 leaderActorContext.setRaftPolicy(raftPolicy);
1839 leader = new Leader(leaderActorContext);
1841 leader.markFollowerActive("follower-1");
1842 leader.markFollowerActive("follower-2");
1843 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1844 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1845 behavior instanceof Leader);
1847 // kill 1 follower and verify if that got killed
1848 final JavaTestKit probe = new JavaTestKit(getSystem());
1849 probe.watch(followerActor1);
1850 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1851 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1852 assertEquals(termMsg1.getActor(), followerActor1);
1854 leader.markFollowerInActive("follower-1");
1855 leader.markFollowerActive("follower-2");
1856 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1857 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1858 behavior instanceof Leader);
1860 // kill 2nd follower and leader should change to Isolated leader
1861 followerActor2.tell(PoisonPill.getInstance(), null);
1862 probe.watch(followerActor2);
1863 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1864 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1865 assertEquals(termMsg2.getActor(), followerActor2);
1867 leader.markFollowerInActive("follower-2");
1868 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1872 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1873 logStart("testIsolatedLeaderCheckTwoFollowers");
1875 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1877 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1878 behavior instanceof IsolatedLeader);
1882 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1883 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1885 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1887 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1888 behavior instanceof Leader);
1892 public void testLaggingFollowerStarvation() throws Exception {
1893 logStart("testLaggingFollowerStarvation");
1894 new JavaTestKit(getSystem()) {{
1895 String leaderActorId = actorFactory.generateActorId("leader");
1896 String follower1ActorId = actorFactory.generateActorId("follower");
1897 String follower2ActorId = actorFactory.generateActorId("follower");
1899 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1900 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1901 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1902 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1904 MockRaftActorContext leaderActorContext =
1905 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1907 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1908 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1909 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1911 leaderActorContext.setConfigParams(configParams);
1913 leaderActorContext.setReplicatedLog(
1914 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1916 Map<String, String> peerAddresses = new HashMap<>();
1917 peerAddresses.put(follower1ActorId,
1918 follower1Actor.path().toString());
1919 peerAddresses.put(follower2ActorId,
1920 follower2Actor.path().toString());
1922 leaderActorContext.setPeerAddresses(peerAddresses);
1923 leaderActorContext.getTermInformation().update(1, leaderActorId);
1925 RaftActorBehavior leader = createBehavior(leaderActorContext);
1927 leaderActor.underlyingActor().setBehavior(leader);
1929 for(int i=1;i<6;i++) {
1930 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1931 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1932 assertTrue(newBehavior == leader);
1933 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1936 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1937 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1939 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1940 heartbeats.size() > 1);
1942 // Check if follower-2 got AppendEntries during this time and was not starved
1943 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1945 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1946 appendEntries.size() > 1);
1952 public void testReplicationConsensusWithNonVotingFollower() {
1953 logStart("testReplicationConsensusWithNonVotingFollower");
1955 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1956 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1957 new FiniteDuration(1000, TimeUnit.SECONDS));
1959 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1961 String nonVotingFollowerId = "nonvoting-follower";
1962 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1963 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1965 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1967 leader = new Leader(leaderActorContext);
1969 // Ignore initial heartbeats
1970 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1971 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1973 MessageCollectorActor.clearMessages(followerActor);
1974 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1975 MessageCollectorActor.clearMessages(leaderActor);
1977 // Send a Replicate message and wait for AppendEntries.
1978 sendReplicate(leaderActorContext, 0);
1980 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1983 // Send reply only from the voting follower and verify consensus via ApplyState.
1984 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1986 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1988 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1990 MessageCollectorActor.clearMessages(followerActor);
1991 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1992 MessageCollectorActor.clearMessages(leaderActor);
1994 // Send another Replicate message
1995 sendReplicate(leaderActorContext, 1);
1997 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1998 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1999 AppendEntries.class);
2000 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2001 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2003 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2004 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2006 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2008 // Send reply from the voting follower and verify consensus.
2009 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2011 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2015 public void testTransferLeadershipWithFollowerInSync() {
2016 logStart("testTransferLeadershipWithFollowerInSync");
2018 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2019 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2020 new FiniteDuration(1000, TimeUnit.SECONDS));
2021 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2023 leader = new Leader(leaderActorContext);
2025 // Initial heartbeat
2026 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2027 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2028 MessageCollectorActor.clearMessages(followerActor);
2030 sendReplicate(leaderActorContext, 0);
2031 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2033 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2034 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2035 MessageCollectorActor.clearMessages(followerActor);
2037 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2038 leader.transferLeadership(mockTransferCohort);
2040 verify(mockTransferCohort, never()).transferComplete();
2041 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2042 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2044 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2045 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2047 // Leader should force an election timeout
2048 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2050 verify(mockTransferCohort).transferComplete();
2054 public void testTransferLeadershipWithEmptyLog() {
2055 logStart("testTransferLeadershipWithEmptyLog");
2057 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2058 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2059 new FiniteDuration(1000, TimeUnit.SECONDS));
2060 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2062 leader = new Leader(leaderActorContext);
2064 // Initial heartbeat
2065 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2067 MessageCollectorActor.clearMessages(followerActor);
2069 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070 leader.transferLeadership(mockTransferCohort);
2072 verify(mockTransferCohort, never()).transferComplete();
2073 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2076 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2079 // Leader should force an election timeout
2080 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2082 verify(mockTransferCohort).transferComplete();
2086 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2087 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2089 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2093 leader = new Leader(leaderActorContext);
2095 // Initial heartbeat
2096 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097 MessageCollectorActor.clearMessages(followerActor);
2099 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2100 leader.transferLeadership(mockTransferCohort);
2102 verify(mockTransferCohort, never()).transferComplete();
2104 // Sync up the follower.
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107 MessageCollectorActor.clearMessages(followerActor);
2109 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2110 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2111 leader.handleMessage(leaderActor, new SendHeartBeat());
2112 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2113 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2115 // Leader should force an election timeout
2116 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2118 verify(mockTransferCohort).transferComplete();
2122 public void testTransferLeadershipWithFollowerSyncTimeout() {
2123 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2125 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2128 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2129 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2131 leader = new Leader(leaderActorContext);
2133 // Initial heartbeat
2134 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2135 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2136 MessageCollectorActor.clearMessages(followerActor);
2138 sendReplicate(leaderActorContext, 0);
2139 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2141 MessageCollectorActor.clearMessages(followerActor);
2143 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2144 leader.transferLeadership(mockTransferCohort);
2146 verify(mockTransferCohort, never()).transferComplete();
2148 // Send heartbeats to time out the transfer.
2149 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2150 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2151 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2152 leader.handleMessage(leaderActor, new SendHeartBeat());
2155 verify(mockTransferCohort).abortTransfer();
2156 verify(mockTransferCohort, never()).transferComplete();
2157 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2161 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2162 ActorRef actorRef, RaftRPC rpc) throws Exception {
2163 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2164 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2167 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2169 private final long electionTimeOutIntervalMillis;
2170 private final int snapshotChunkSize;
2172 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2174 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2175 this.snapshotChunkSize = snapshotChunkSize;
2179 public FiniteDuration getElectionTimeOutInterval() {
2180 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2184 public int getSnapshotChunkSize() {
2185 return snapshotChunkSize;