2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
65 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.yangtools.concepts.Identifier;
68 import scala.concurrent.duration.FiniteDuration;
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
72 static final String FOLLOWER_ID = "follower";
73 public static final String LEADER_ID = "leader";
75 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
78 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
81 private Leader leader;
82 private final short payloadVersion = 5;
86 public void tearDown() throws Exception {
95 public void testHandleMessageForUnknownMessage() throws Exception {
96 logStart("testHandleMessageForUnknownMessage");
98 leader = new Leader(createActorContext());
100 // handle message should null when it receives an unknown message
101 assertNull(leader.handleMessage(followerActor, "foo"));
105 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
108 MockRaftActorContext actorContext = createActorContextWithFollower();
109 short payloadVersion = (short)5;
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
116 actorContext.setCurrentBehavior(leader);
118 // Leader should send an immediate heartbeat with no entries as follower is inactive.
119 long lastIndex = actorContext.getReplicatedLog().lastIndex();
120 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getTerm", term, appendEntries.getTerm());
122 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 0, appendEntries.getEntries().size());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127 // The follower would normally reply - simulate that explicitly here.
128 leader.handleMessage(followerActor, new AppendEntriesReply(
129 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
132 followerActor.underlyingActor().clear();
134 // Sleep for the heartbeat interval so AppendEntries is sent.
135 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
138 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
140 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143 assertEquals("Entries size", 1, appendEntries.getEntries().size());
144 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151 return sendReplicate(actorContext, 1, index);
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157 term, index, payload);
158 actorContext.getReplicatedLog().append(newEntry);
159 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
163 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
166 MockRaftActorContext actorContext = createActorContextWithFollower();
169 actorContext.getTermInformation().update(term, "");
171 leader = new Leader(actorContext);
173 // Leader will send an immediate heartbeat - ignore it.
174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
176 // The follower would normally reply - simulate that explicitly here.
177 long lastIndex = actorContext.getReplicatedLog().lastIndex();
178 leader.handleMessage(followerActor, new AppendEntriesReply(
179 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
182 followerActor.underlyingActor().clear();
184 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
186 // State should not change
187 assertTrue(raftBehavior instanceof Leader);
189 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192 assertEquals("Entries size", 1, appendEntries.getEntries().size());
193 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
200 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
203 MockRaftActorContext actorContext = createActorContextWithFollower();
205 // The raft context is initialized with a couple log entries. However the commitIndex
206 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
207 // committed and applied. Now it regains leadership with a higher term (2).
208 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
209 long newTerm = prevTerm + 1;
210 actorContext.getTermInformation().update(newTerm, "");
212 leader = new Leader(actorContext);
213 actorContext.setCurrentBehavior(leader);
215 // Leader will send an immediate heartbeat - ignore it.
216 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
218 // The follower replies with the leader's current last index and term, simulating that it is
219 // up to date with the leader.
220 long lastIndex = actorContext.getReplicatedLog().lastIndex();
221 leader.handleMessage(followerActor, new AppendEntriesReply(
222 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
224 // The commit index should not get updated even though consensus was reached. This is b/c the
225 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
226 // from previous terms by counting replicas".
227 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
229 followerActor.underlyingActor().clear();
231 // Now replicate a new entry with the new term 2.
232 long newIndex = lastIndex + 1;
233 sendReplicate(actorContext, newTerm, newIndex);
235 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
237 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
238 assertEquals("Entries size", 1, appendEntries.getEntries().size());
239 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
240 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
241 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
243 // The follower replies with success. The leader should now update the commit index to the new index
244 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
245 // prior entries are committed indirectly".
246 leader.handleMessage(followerActor, new AppendEntriesReply(
247 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
249 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
253 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
254 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
256 MockRaftActorContext actorContext = createActorContextWithFollower();
257 actorContext.setRaftPolicy(createRaftPolicy(true, true));
260 actorContext.getTermInformation().update(term, "");
262 leader = new Leader(actorContext);
264 // Leader will send an immediate heartbeat - ignore it.
265 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267 // The follower would normally reply - simulate that explicitly here.
268 long lastIndex = actorContext.getReplicatedLog().lastIndex();
269 leader.handleMessage(followerActor, new AppendEntriesReply(
270 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
271 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
273 followerActor.underlyingActor().clear();
275 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
277 // State should not change
278 assertTrue(raftBehavior instanceof Leader);
280 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
282 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
283 assertEquals("Entries size", 1, appendEntries.getEntries().size());
284 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
285 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
286 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
287 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
291 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
292 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
294 MockRaftActorContext actorContext = createActorContextWithFollower();
295 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
297 public FiniteDuration getHeartBeatInterval() {
298 return FiniteDuration.apply(5, TimeUnit.SECONDS);
303 actorContext.getTermInformation().update(term, "");
305 leader = new Leader(actorContext);
307 // Leader will send an immediate heartbeat - ignore it.
308 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
310 // The follower would normally reply - simulate that explicitly here.
311 long lastIndex = actorContext.getReplicatedLog().lastIndex();
312 leader.handleMessage(followerActor, new AppendEntriesReply(
313 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
314 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
316 followerActor.underlyingActor().clear();
318 for(int i=0;i<5;i++) {
319 sendReplicate(actorContext, lastIndex+i+1);
322 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
323 // We expect only 1 message to be sent because of two reasons,
324 // - an append entries reply was not received
325 // - the heartbeat interval has not expired
326 // In this scenario if multiple messages are sent they would likely be duplicates
327 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
331 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
332 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
334 MockRaftActorContext actorContext = createActorContextWithFollower();
335 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
337 public FiniteDuration getHeartBeatInterval() {
338 return FiniteDuration.apply(5, TimeUnit.SECONDS);
343 actorContext.getTermInformation().update(term, "");
345 leader = new Leader(actorContext);
347 // Leader will send an immediate heartbeat - ignore it.
348 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350 // The follower would normally reply - simulate that explicitly here.
351 long lastIndex = actorContext.getReplicatedLog().lastIndex();
352 leader.handleMessage(followerActor, new AppendEntriesReply(
353 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
354 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
356 followerActor.underlyingActor().clear();
358 for(int i=0;i<3;i++) {
359 sendReplicate(actorContext, lastIndex+i+1);
360 leader.handleMessage(followerActor, new AppendEntriesReply(
361 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
365 for(int i=3;i<5;i++) {
366 sendReplicate(actorContext, lastIndex + i + 1);
369 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
370 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
371 // get sent to the follower - but not the 5th
372 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
374 for(int i=0;i<4;i++) {
375 long expected = allMessages.get(i).getEntries().get(0).getIndex();
376 assertEquals(expected, i+2);
381 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
382 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
384 MockRaftActorContext actorContext = createActorContextWithFollower();
385 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
387 public FiniteDuration getHeartBeatInterval() {
388 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
393 actorContext.getTermInformation().update(term, "");
395 leader = new Leader(actorContext);
397 // Leader will send an immediate heartbeat - ignore it.
398 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
400 // The follower would normally reply - simulate that explicitly here.
401 long lastIndex = actorContext.getReplicatedLog().lastIndex();
402 leader.handleMessage(followerActor, new AppendEntriesReply(
403 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
404 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
406 followerActor.underlyingActor().clear();
408 sendReplicate(actorContext, lastIndex+1);
410 // Wait slightly longer than heartbeat duration
411 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
413 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
415 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
416 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
418 assertEquals(1, allMessages.get(0).getEntries().size());
419 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420 assertEquals(1, allMessages.get(1).getEntries().size());
421 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
426 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
427 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
429 MockRaftActorContext actorContext = createActorContextWithFollower();
430 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
432 public FiniteDuration getHeartBeatInterval() {
433 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
438 actorContext.getTermInformation().update(term, "");
440 leader = new Leader(actorContext);
442 // Leader will send an immediate heartbeat - ignore it.
443 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
445 // The follower would normally reply - simulate that explicitly here.
446 long lastIndex = actorContext.getReplicatedLog().lastIndex();
447 leader.handleMessage(followerActor, new AppendEntriesReply(
448 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
449 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
451 followerActor.underlyingActor().clear();
453 for(int i=0;i<3;i++) {
454 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
455 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
458 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
459 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
463 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
464 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
466 MockRaftActorContext actorContext = createActorContextWithFollower();
467 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469 public FiniteDuration getHeartBeatInterval() {
470 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
475 actorContext.getTermInformation().update(term, "");
477 leader = new Leader(actorContext);
479 // Leader will send an immediate heartbeat - ignore it.
480 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482 // The follower would normally reply - simulate that explicitly here.
483 long lastIndex = actorContext.getReplicatedLog().lastIndex();
484 leader.handleMessage(followerActor, new AppendEntriesReply(
485 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488 followerActor.underlyingActor().clear();
490 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492 sendReplicate(actorContext, lastIndex+1);
494 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
497 assertEquals(0, allMessages.get(0).getEntries().size());
498 assertEquals(1, allMessages.get(1).getEntries().size());
503 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
504 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
506 MockRaftActorContext actorContext = createActorContext();
508 leader = new Leader(actorContext);
510 actorContext.setLastApplied(0);
512 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
513 long term = actorContext.getTermInformation().getCurrentTerm();
514 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
515 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
517 actorContext.getReplicatedLog().append(newEntry);
519 final Identifier id = new MockIdentifier("state-id");
520 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
522 // State should not change
523 assertTrue(raftBehavior instanceof Leader);
525 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
527 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
528 // one since lastApplied state is 0.
529 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
530 leaderActor, ApplyState.class);
531 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
533 for(int i = 0; i <= newLogIndex - 1; i++ ) {
534 ApplyState applyState = applyStateList.get(i);
535 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
536 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
539 ApplyState last = applyStateList.get((int) newLogIndex - 1);
540 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
541 assertEquals("getIdentifier", id, last.getIdentifier());
545 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
546 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
548 MockRaftActorContext actorContext = createActorContextWithFollower();
550 Map<String, String> leadersSnapshot = new HashMap<>();
551 leadersSnapshot.put("1", "A");
552 leadersSnapshot.put("2", "B");
553 leadersSnapshot.put("3", "C");
556 actorContext.getReplicatedLog().removeFrom(0);
558 final int commitIndex = 3;
559 final int snapshotIndex = 2;
560 final int newEntryIndex = 4;
561 final int snapshotTerm = 1;
562 final int currentTerm = 2;
564 // set the snapshot variables in replicatedlog
565 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567 actorContext.setCommitIndex(commitIndex);
568 //set follower timeout to 2 mins, helps during debugging
569 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
571 leader = new Leader(actorContext);
573 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
577 ReplicatedLogImplEntry entry =
578 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579 new MockRaftActorContext.MockPayload("D"));
581 //update follower timestamp
582 leader.markFollowerActive(FOLLOWER_ID);
584 ByteString bs = toByteString(leadersSnapshot);
585 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
586 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
587 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
588 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
590 //send first chunk and no InstallSnapshotReply received yet
592 fts.incrementChunkIndex();
594 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
595 TimeUnit.MILLISECONDS);
597 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
599 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
601 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
603 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
605 //InstallSnapshotReply received
606 fts.markSendStatus(true);
608 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
610 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
612 assertEquals(commitIndex, is.getLastIncludedIndex());
616 public void testSendAppendEntriesSnapshotScenario() throws Exception {
617 logStart("testSendAppendEntriesSnapshotScenario");
619 MockRaftActorContext actorContext = createActorContextWithFollower();
621 Map<String, String> leadersSnapshot = new HashMap<>();
622 leadersSnapshot.put("1", "A");
623 leadersSnapshot.put("2", "B");
624 leadersSnapshot.put("3", "C");
627 actorContext.getReplicatedLog().removeFrom(0);
629 final int followersLastIndex = 2;
630 final int snapshotIndex = 3;
631 final int newEntryIndex = 4;
632 final int snapshotTerm = 1;
633 final int currentTerm = 2;
635 // set the snapshot variables in replicatedlog
636 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638 actorContext.setCommitIndex(followersLastIndex);
640 leader = new Leader(actorContext);
642 // Leader will send an immediate heartbeat - ignore it.
643 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
646 ReplicatedLogImplEntry entry =
647 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
648 new MockRaftActorContext.MockPayload("D"));
650 actorContext.getReplicatedLog().append(entry);
652 //update follower timestamp
653 leader.markFollowerActive(FOLLOWER_ID);
655 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656 RaftActorBehavior raftBehavior = leader.handleMessage(
657 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
659 assertTrue(raftBehavior instanceof Leader);
661 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
665 public void testInitiateInstallSnapshot() throws Exception {
666 logStart("testInitiateInstallSnapshot");
668 MockRaftActorContext actorContext = createActorContextWithFollower();
671 actorContext.getReplicatedLog().removeFrom(0);
673 final int followersLastIndex = 2;
674 final int snapshotIndex = 3;
675 final int newEntryIndex = 4;
676 final int snapshotTerm = 1;
677 final int currentTerm = 2;
679 // set the snapshot variables in replicatedlog
680 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682 actorContext.setLastApplied(3);
683 actorContext.setCommitIndex(followersLastIndex);
685 leader = new Leader(actorContext);
687 // Leader will send an immediate heartbeat - ignore it.
688 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
690 // set the snapshot as absent and check if capture-snapshot is invoked.
691 leader.setSnapshot(null);
694 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695 new MockRaftActorContext.MockPayload("D"));
697 actorContext.getReplicatedLog().append(entry);
699 //update follower timestamp
700 leader.markFollowerActive(FOLLOWER_ID);
702 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
704 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
706 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
708 assertTrue(cs.isInstallSnapshotInitiated());
709 assertEquals(3, cs.getLastAppliedIndex());
710 assertEquals(1, cs.getLastAppliedTerm());
711 assertEquals(4, cs.getLastIndex());
712 assertEquals(2, cs.getLastTerm());
714 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
715 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
717 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
721 public void testInitiateForceInstallSnapshot() throws Exception {
722 logStart("testInitiateForceInstallSnapshot");
724 MockRaftActorContext actorContext = createActorContextWithFollower();
726 final int followersLastIndex = 2;
727 final int snapshotIndex = -1;
728 final int newEntryIndex = 4;
729 final int snapshotTerm = -1;
730 final int currentTerm = 2;
732 // set the snapshot variables in replicatedlog
733 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735 actorContext.setLastApplied(3);
736 actorContext.setCommitIndex(followersLastIndex);
738 actorContext.getReplicatedLog().removeFrom(0);
740 leader = new Leader(actorContext);
742 // Leader will send an immediate heartbeat - ignore it.
743 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
745 // set the snapshot as absent and check if capture-snapshot is invoked.
746 leader.setSnapshot(null);
748 for(int i=0;i<4;i++) {
749 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
750 new MockRaftActorContext.MockPayload("X" + i)));
754 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
755 new MockRaftActorContext.MockPayload("D"));
757 actorContext.getReplicatedLog().append(entry);
759 //update follower timestamp
760 leader.markFollowerActive(FOLLOWER_ID);
762 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763 // installed with a SendInstallSnapshot
764 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
766 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
768 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
770 assertTrue(cs.isInstallSnapshotInitiated());
771 assertEquals(3, cs.getLastAppliedIndex());
772 assertEquals(1, cs.getLastAppliedTerm());
773 assertEquals(4, cs.getLastIndex());
774 assertEquals(2, cs.getLastTerm());
776 // if an initiate is started again when first is in progress, it should not initiate Capture
777 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
779 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
784 public void testInstallSnapshot() throws Exception {
785 logStart("testInstallSnapshot");
787 MockRaftActorContext actorContext = createActorContextWithFollower();
789 Map<String, String> leadersSnapshot = new HashMap<>();
790 leadersSnapshot.put("1", "A");
791 leadersSnapshot.put("2", "B");
792 leadersSnapshot.put("3", "C");
795 actorContext.getReplicatedLog().removeFrom(0);
797 final int lastAppliedIndex = 3;
798 final int snapshotIndex = 2;
799 final int snapshotTerm = 1;
800 final int currentTerm = 2;
802 // set the snapshot variables in replicatedlog
803 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806 actorContext.setCommitIndex(lastAppliedIndex);
807 actorContext.setLastApplied(lastAppliedIndex);
809 leader = new Leader(actorContext);
811 // Initial heartbeat.
812 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
814 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
815 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
817 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
818 Collections.<ReplicatedLogEntry>emptyList(),
819 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
821 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
823 assertTrue(raftBehavior instanceof Leader);
825 // check if installsnapshot gets called with the correct values.
827 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
829 assertNotNull(installSnapshot.getData());
830 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
833 assertEquals(currentTerm, installSnapshot.getTerm());
837 public void testForceInstallSnapshot() throws Exception {
838 logStart("testForceInstallSnapshot");
840 MockRaftActorContext actorContext = createActorContextWithFollower();
842 Map<String, String> leadersSnapshot = new HashMap<>();
843 leadersSnapshot.put("1", "A");
844 leadersSnapshot.put("2", "B");
845 leadersSnapshot.put("3", "C");
847 final int lastAppliedIndex = 3;
848 final int snapshotIndex = -1;
849 final int snapshotTerm = -1;
850 final int currentTerm = 2;
852 // set the snapshot variables in replicatedlog
853 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856 actorContext.setCommitIndex(lastAppliedIndex);
857 actorContext.setLastApplied(lastAppliedIndex);
859 leader = new Leader(actorContext);
861 // Initial heartbeat.
862 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
867 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
868 Collections.<ReplicatedLogEntry>emptyList(),
869 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
871 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
873 assertTrue(raftBehavior instanceof Leader);
875 // check if installsnapshot gets called with the correct values.
877 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
879 assertNotNull(installSnapshot.getData());
880 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
881 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
883 assertEquals(currentTerm, installSnapshot.getTerm());
887 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
888 logStart("testHandleInstallSnapshotReplyLastChunk");
890 MockRaftActorContext actorContext = createActorContextWithFollower();
892 final int commitIndex = 3;
893 final int snapshotIndex = 2;
894 final int snapshotTerm = 1;
895 final int currentTerm = 2;
897 actorContext.setCommitIndex(commitIndex);
899 leader = new Leader(actorContext);
900 actorContext.setCurrentBehavior(leader);
902 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
903 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
905 // Ignore initial heartbeat.
906 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
908 Map<String, String> leadersSnapshot = new HashMap<>();
909 leadersSnapshot.put("1", "A");
910 leadersSnapshot.put("2", "B");
911 leadersSnapshot.put("3", "C");
913 // set the snapshot variables in replicatedlog
915 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
916 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
917 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
919 ByteString bs = toByteString(leadersSnapshot);
920 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
921 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
922 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
923 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
924 while(!fts.isLastChunk(fts.getChunkIndex())) {
926 fts.incrementChunkIndex();
930 actorContext.getReplicatedLog().removeFrom(0);
932 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
935 assertTrue(raftBehavior instanceof Leader);
937 assertEquals(0, leader.followerSnapshotSize());
938 assertEquals(1, leader.followerLogSize());
939 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
941 assertEquals(commitIndex, fli.getMatchIndex());
942 assertEquals(commitIndex + 1, fli.getNextIndex());
946 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
947 logStart("testSendSnapshotfromInstallSnapshotReply");
949 MockRaftActorContext actorContext = createActorContextWithFollower();
951 final int commitIndex = 3;
952 final int snapshotIndex = 2;
953 final int snapshotTerm = 1;
954 final int currentTerm = 2;
956 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
958 public int getSnapshotChunkSize() {
962 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
963 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
965 actorContext.setConfigParams(configParams);
966 actorContext.setCommitIndex(commitIndex);
968 leader = new Leader(actorContext);
969 actorContext.setCurrentBehavior(leader);
971 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
972 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
974 Map<String, String> leadersSnapshot = new HashMap<>();
975 leadersSnapshot.put("1", "A");
976 leadersSnapshot.put("2", "B");
977 leadersSnapshot.put("3", "C");
979 // set the snapshot variables in replicatedlog
980 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
981 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
982 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
984 ByteString bs = toByteString(leadersSnapshot);
985 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
986 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
987 leader.setSnapshot(snapshot);
989 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
991 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
993 assertEquals(1, installSnapshot.getChunkIndex());
994 assertEquals(3, installSnapshot.getTotalChunks());
996 followerActor.underlyingActor().clear();
997 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
998 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1000 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1002 assertEquals(2, installSnapshot.getChunkIndex());
1003 assertEquals(3, installSnapshot.getTotalChunks());
1005 followerActor.underlyingActor().clear();
1006 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1007 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1009 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1011 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1012 followerActor.underlyingActor().clear();
1013 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1016 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1018 assertNull(installSnapshot);
1023 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1024 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1026 MockRaftActorContext actorContext = createActorContextWithFollower();
1028 final int commitIndex = 3;
1029 final int snapshotIndex = 2;
1030 final int snapshotTerm = 1;
1031 final int currentTerm = 2;
1033 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1035 public int getSnapshotChunkSize() {
1040 actorContext.setCommitIndex(commitIndex);
1042 leader = new Leader(actorContext);
1044 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1045 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1047 Map<String, String> leadersSnapshot = new HashMap<>();
1048 leadersSnapshot.put("1", "A");
1049 leadersSnapshot.put("2", "B");
1050 leadersSnapshot.put("3", "C");
1052 // set the snapshot variables in replicatedlog
1053 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1054 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1055 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1057 ByteString bs = toByteString(leadersSnapshot);
1058 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1059 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1060 leader.setSnapshot(snapshot);
1062 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1063 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1065 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1067 assertEquals(1, installSnapshot.getChunkIndex());
1068 assertEquals(3, installSnapshot.getTotalChunks());
1070 followerActor.underlyingActor().clear();
1072 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073 FOLLOWER_ID, -1, false));
1075 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076 TimeUnit.MILLISECONDS);
1078 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1080 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1082 assertEquals(1, installSnapshot.getChunkIndex());
1083 assertEquals(3, installSnapshot.getTotalChunks());
1087 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1088 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1090 MockRaftActorContext actorContext = createActorContextWithFollower();
1092 final int commitIndex = 3;
1093 final int snapshotIndex = 2;
1094 final int snapshotTerm = 1;
1095 final int currentTerm = 2;
1097 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1099 public int getSnapshotChunkSize() {
1104 actorContext.setCommitIndex(commitIndex);
1106 leader = new Leader(actorContext);
1108 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1109 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1111 Map<String, String> leadersSnapshot = new HashMap<>();
1112 leadersSnapshot.put("1", "A");
1113 leadersSnapshot.put("2", "B");
1114 leadersSnapshot.put("3", "C");
1116 // set the snapshot variables in replicatedlog
1117 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1118 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1119 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1121 ByteString bs = toByteString(leadersSnapshot);
1122 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1123 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1124 leader.setSnapshot(snapshot);
1126 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1128 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1130 assertEquals(1, installSnapshot.getChunkIndex());
1131 assertEquals(3, installSnapshot.getTotalChunks());
1132 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1134 int hashCode = Arrays.hashCode(installSnapshot.getData());
1136 followerActor.underlyingActor().clear();
1138 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1139 FOLLOWER_ID, 1, true));
1141 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1143 assertEquals(2, installSnapshot.getChunkIndex());
1144 assertEquals(3, installSnapshot.getTotalChunks());
1145 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1149 public void testFollowerToSnapshotLogic() {
1150 logStart("testFollowerToSnapshotLogic");
1152 MockRaftActorContext actorContext = createActorContext();
1154 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1156 public int getSnapshotChunkSize() {
1161 leader = new Leader(actorContext);
1163 Map<String, String> leadersSnapshot = new HashMap<>();
1164 leadersSnapshot.put("1", "A");
1165 leadersSnapshot.put("2", "B");
1166 leadersSnapshot.put("3", "C");
1168 ByteString bs = toByteString(leadersSnapshot);
1169 byte[] barray = bs.toByteArray();
1171 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1172 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1174 assertEquals(bs.size(), barray.length);
1177 for (int i=0; i < barray.length; i = i + 50) {
1181 if (i + 50 > barray.length) {
1185 byte[] chunk = fts.getNextChunk();
1186 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1187 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1189 fts.markSendStatus(true);
1190 if (!fts.isLastChunk(chunkIndex)) {
1191 fts.incrementChunkIndex();
1195 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1199 protected Leader createBehavior(final RaftActorContext actorContext) {
1200 return new Leader(actorContext);
1204 protected MockRaftActorContext createActorContext() {
1205 return createActorContext(leaderActor);
1209 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1210 return createActorContext(LEADER_ID, actorRef);
1213 private MockRaftActorContext createActorContextWithFollower() {
1214 MockRaftActorContext actorContext = createActorContext();
1215 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1216 followerActor.path().toString()).build());
1217 return actorContext;
1220 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1221 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1222 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1223 configParams.setElectionTimeoutFactor(100000);
1224 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1225 context.setConfigParams(configParams);
1226 context.setPayloadVersion(payloadVersion);
1230 private MockRaftActorContext createFollowerActorContextWithLeader() {
1231 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1232 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1233 followerConfig.setElectionTimeoutFactor(10000);
1234 followerActorContext.setConfigParams(followerConfig);
1235 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1236 return followerActorContext;
1240 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1241 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1243 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1245 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1247 Follower follower = new Follower(followerActorContext);
1248 followerActor.underlyingActor().setBehavior(follower);
1249 followerActorContext.setCurrentBehavior(follower);
1251 Map<String, String> peerAddresses = new HashMap<>();
1252 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1254 leaderActorContext.setPeerAddresses(peerAddresses);
1256 leaderActorContext.getReplicatedLog().removeFrom(0);
1259 leaderActorContext.setReplicatedLog(
1260 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1262 leaderActorContext.setCommitIndex(1);
1264 followerActorContext.getReplicatedLog().removeFrom(0);
1266 // follower too has the exact same log entries and has the same commit index
1267 followerActorContext.setReplicatedLog(
1268 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1270 followerActorContext.setCommitIndex(1);
1272 leader = new Leader(leaderActorContext);
1273 leaderActorContext.setCurrentBehavior(leader);
1275 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1277 assertEquals(1, appendEntries.getLeaderCommit());
1278 assertEquals(0, appendEntries.getEntries().size());
1279 assertEquals(0, appendEntries.getPrevLogIndex());
1281 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1282 leaderActor, AppendEntriesReply.class);
1284 assertEquals(2, appendEntriesReply.getLogLastIndex());
1285 assertEquals(1, appendEntriesReply.getLogLastTerm());
1287 // follower returns its next index
1288 assertEquals(2, appendEntriesReply.getLogLastIndex());
1289 assertEquals(1, appendEntriesReply.getLogLastTerm());
1295 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1296 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1298 MockRaftActorContext leaderActorContext = createActorContext();
1300 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1301 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1303 Follower follower = new Follower(followerActorContext);
1304 followerActor.underlyingActor().setBehavior(follower);
1305 followerActorContext.setCurrentBehavior(follower);
1307 Map<String, String> leaderPeerAddresses = new HashMap<>();
1308 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1310 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1312 leaderActorContext.getReplicatedLog().removeFrom(0);
1314 leaderActorContext.setReplicatedLog(
1315 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1317 leaderActorContext.setCommitIndex(1);
1319 followerActorContext.getReplicatedLog().removeFrom(0);
1321 followerActorContext.setReplicatedLog(
1322 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1324 // follower has the same log entries but its commit index > leaders commit index
1325 followerActorContext.setCommitIndex(2);
1327 leader = new Leader(leaderActorContext);
1329 // Initial heartbeat
1330 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1332 assertEquals(1, appendEntries.getLeaderCommit());
1333 assertEquals(0, appendEntries.getEntries().size());
1334 assertEquals(0, appendEntries.getPrevLogIndex());
1336 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1337 leaderActor, AppendEntriesReply.class);
1339 assertEquals(2, appendEntriesReply.getLogLastIndex());
1340 assertEquals(1, appendEntriesReply.getLogLastTerm());
1342 leaderActor.underlyingActor().setBehavior(follower);
1343 leader.handleMessage(followerActor, appendEntriesReply);
1345 leaderActor.underlyingActor().clear();
1346 followerActor.underlyingActor().clear();
1348 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1349 TimeUnit.MILLISECONDS);
1351 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1353 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1355 assertEquals(2, appendEntries.getLeaderCommit());
1356 assertEquals(0, appendEntries.getEntries().size());
1357 assertEquals(2, appendEntries.getPrevLogIndex());
1359 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1361 assertEquals(2, appendEntriesReply.getLogLastIndex());
1362 assertEquals(1, appendEntriesReply.getLogLastTerm());
1364 assertEquals(2, followerActorContext.getCommitIndex());
1370 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1371 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1373 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1374 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1375 new FiniteDuration(1000, TimeUnit.SECONDS));
1377 leaderActorContext.setReplicatedLog(
1378 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1379 long leaderCommitIndex = 2;
1380 leaderActorContext.setCommitIndex(leaderCommitIndex);
1381 leaderActorContext.setLastApplied(leaderCommitIndex);
1383 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1384 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1386 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1388 followerActorContext.setReplicatedLog(
1389 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1390 followerActorContext.setCommitIndex(0);
1391 followerActorContext.setLastApplied(0);
1393 Follower follower = new Follower(followerActorContext);
1394 followerActor.underlyingActor().setBehavior(follower);
1396 leader = new Leader(leaderActorContext);
1398 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1399 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1401 MessageCollectorActor.clearMessages(followerActor);
1402 MessageCollectorActor.clearMessages(leaderActor);
1404 // Verify initial AppendEntries sent with the leader's current commit index.
1405 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1406 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1407 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1409 leaderActor.underlyingActor().setBehavior(leader);
1411 leader.handleMessage(followerActor, appendEntriesReply);
1413 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1414 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1416 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1417 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1418 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1420 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1421 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1422 appendEntries.getEntries().get(0).getData());
1423 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1424 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1425 appendEntries.getEntries().get(1).getData());
1427 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1428 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1430 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1432 ApplyState applyState = applyStateList.get(0);
1433 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1434 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1435 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1436 applyState.getReplicatedLogEntry().getData());
1438 applyState = applyStateList.get(1);
1439 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1440 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1441 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1442 applyState.getReplicatedLogEntry().getData());
1444 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1445 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1449 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1450 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1452 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1453 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1454 new FiniteDuration(1000, TimeUnit.SECONDS));
1456 leaderActorContext.setReplicatedLog(
1457 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1458 long leaderCommitIndex = 1;
1459 leaderActorContext.setCommitIndex(leaderCommitIndex);
1460 leaderActorContext.setLastApplied(leaderCommitIndex);
1462 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1463 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1465 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1467 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1468 followerActorContext.setCommitIndex(-1);
1469 followerActorContext.setLastApplied(-1);
1471 Follower follower = new Follower(followerActorContext);
1472 followerActor.underlyingActor().setBehavior(follower);
1473 followerActorContext.setCurrentBehavior(follower);
1475 leader = new Leader(leaderActorContext);
1477 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1478 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1480 MessageCollectorActor.clearMessages(followerActor);
1481 MessageCollectorActor.clearMessages(leaderActor);
1483 // Verify initial AppendEntries sent with the leader's current commit index.
1484 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1485 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1486 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1488 leaderActor.underlyingActor().setBehavior(leader);
1489 leaderActorContext.setCurrentBehavior(leader);
1491 leader.handleMessage(followerActor, appendEntriesReply);
1493 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1494 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1496 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1497 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1498 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1500 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1501 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1502 appendEntries.getEntries().get(0).getData());
1503 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1504 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1505 appendEntries.getEntries().get(1).getData());
1507 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1508 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1510 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1512 ApplyState applyState = applyStateList.get(0);
1513 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1514 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1515 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1516 applyState.getReplicatedLogEntry().getData());
1518 applyState = applyStateList.get(1);
1519 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1520 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1521 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1522 applyState.getReplicatedLogEntry().getData());
1524 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1525 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1529 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1530 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1532 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1533 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1534 new FiniteDuration(1000, TimeUnit.SECONDS));
1536 leaderActorContext.setReplicatedLog(
1537 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1538 long leaderCommitIndex = 1;
1539 leaderActorContext.setCommitIndex(leaderCommitIndex);
1540 leaderActorContext.setLastApplied(leaderCommitIndex);
1542 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1543 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1545 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1547 followerActorContext.setReplicatedLog(
1548 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1549 followerActorContext.setCommitIndex(-1);
1550 followerActorContext.setLastApplied(-1);
1552 Follower follower = new Follower(followerActorContext);
1553 followerActor.underlyingActor().setBehavior(follower);
1554 followerActorContext.setCurrentBehavior(follower);
1556 leader = new Leader(leaderActorContext);
1558 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1559 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1561 MessageCollectorActor.clearMessages(followerActor);
1562 MessageCollectorActor.clearMessages(leaderActor);
1564 // Verify initial AppendEntries sent with the leader's current commit index.
1565 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1566 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1567 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1569 leaderActor.underlyingActor().setBehavior(leader);
1570 leaderActorContext.setCurrentBehavior(leader);
1572 leader.handleMessage(followerActor, appendEntriesReply);
1574 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1575 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1577 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1578 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1579 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1581 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1582 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1583 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1584 appendEntries.getEntries().get(0).getData());
1585 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1586 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1587 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1588 appendEntries.getEntries().get(1).getData());
1590 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1591 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1593 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1595 ApplyState applyState = applyStateList.get(0);
1596 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1597 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1598 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1599 applyState.getReplicatedLogEntry().getData());
1601 applyState = applyStateList.get(1);
1602 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1603 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1604 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1605 applyState.getReplicatedLogEntry().getData());
1607 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1608 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1609 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1613 public void testHandleAppendEntriesReplyWithNewerTerm(){
1614 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1616 MockRaftActorContext leaderActorContext = createActorContext();
1617 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1618 new FiniteDuration(10000, TimeUnit.SECONDS));
1620 leaderActorContext.setReplicatedLog(
1621 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1623 leader = new Leader(leaderActorContext);
1624 leaderActor.underlyingActor().setBehavior(leader);
1625 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1627 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1629 assertEquals(false, appendEntriesReply.isSuccess());
1630 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1632 MessageCollectorActor.clearMessages(leaderActor);
1636 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1637 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1639 MockRaftActorContext leaderActorContext = createActorContext();
1640 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1641 new FiniteDuration(10000, TimeUnit.SECONDS));
1643 leaderActorContext.setReplicatedLog(
1644 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1645 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1647 leader = new Leader(leaderActorContext);
1648 leaderActor.underlyingActor().setBehavior(leader);
1649 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1651 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1653 assertEquals(false, appendEntriesReply.isSuccess());
1654 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1656 MessageCollectorActor.clearMessages(leaderActor);
1660 public void testHandleAppendEntriesReplySuccess() throws Exception {
1661 logStart("testHandleAppendEntriesReplySuccess");
1663 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1665 leaderActorContext.setReplicatedLog(
1666 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1668 leaderActorContext.setCommitIndex(1);
1669 leaderActorContext.setLastApplied(1);
1670 leaderActorContext.getTermInformation().update(1, "leader");
1672 leader = new Leader(leaderActorContext);
1674 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1676 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1677 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1679 short payloadVersion = 5;
1680 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1682 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1684 assertEquals(RaftState.Leader, raftActorBehavior.state());
1686 assertEquals(2, leaderActorContext.getCommitIndex());
1688 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1689 leaderActor, ApplyJournalEntries.class);
1691 assertEquals(2, leaderActorContext.getLastApplied());
1693 assertEquals(2, applyJournalEntries.getToIndex());
1695 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1698 assertEquals(1,applyStateList.size());
1700 ApplyState applyState = applyStateList.get(0);
1702 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1704 assertEquals(2, followerInfo.getMatchIndex());
1705 assertEquals(3, followerInfo.getNextIndex());
1706 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1707 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1711 public void testHandleAppendEntriesReplyUnknownFollower(){
1712 logStart("testHandleAppendEntriesReplyUnknownFollower");
1714 MockRaftActorContext leaderActorContext = createActorContext();
1716 leader = new Leader(leaderActorContext);
1718 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1720 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1722 assertEquals(RaftState.Leader, raftActorBehavior.state());
1726 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1727 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1729 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1730 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1731 new FiniteDuration(1000, TimeUnit.SECONDS));
1732 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1734 leaderActorContext.setReplicatedLog(
1735 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1736 long leaderCommitIndex = 3;
1737 leaderActorContext.setCommitIndex(leaderCommitIndex);
1738 leaderActorContext.setLastApplied(leaderCommitIndex);
1740 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1741 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1742 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1743 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1745 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1747 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1748 followerActorContext.setCommitIndex(-1);
1749 followerActorContext.setLastApplied(-1);
1751 Follower follower = new Follower(followerActorContext);
1752 followerActor.underlyingActor().setBehavior(follower);
1753 followerActorContext.setCurrentBehavior(follower);
1755 leader = new Leader(leaderActorContext);
1757 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1758 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1760 MessageCollectorActor.clearMessages(followerActor);
1761 MessageCollectorActor.clearMessages(leaderActor);
1763 // Verify initial AppendEntries sent with the leader's current commit index.
1764 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1765 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1766 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1768 leaderActor.underlyingActor().setBehavior(leader);
1769 leaderActorContext.setCurrentBehavior(leader);
1771 leader.handleMessage(followerActor, appendEntriesReply);
1773 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1774 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1776 appendEntries = appendEntriesList.get(0);
1777 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1778 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1779 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1781 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1782 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1783 appendEntries.getEntries().get(0).getData());
1784 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1785 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1786 appendEntries.getEntries().get(1).getData());
1788 appendEntries = appendEntriesList.get(1);
1789 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1790 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1791 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1793 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1794 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1795 appendEntries.getEntries().get(0).getData());
1796 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1797 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1798 appendEntries.getEntries().get(1).getData());
1800 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1801 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1803 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1805 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1806 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1810 public void testHandleRequestVoteReply(){
1811 logStart("testHandleRequestVoteReply");
1813 MockRaftActorContext leaderActorContext = createActorContext();
1815 leader = new Leader(leaderActorContext);
1817 // Should be a no-op.
1818 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1819 new RequestVoteReply(1, true));
1821 assertEquals(RaftState.Leader, raftActorBehavior.state());
1823 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1825 assertEquals(RaftState.Leader, raftActorBehavior.state());
1829 public void testIsolatedLeaderCheckNoFollowers() {
1830 logStart("testIsolatedLeaderCheckNoFollowers");
1832 MockRaftActorContext leaderActorContext = createActorContext();
1834 leader = new Leader(leaderActorContext);
1835 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1836 assertTrue(behavior instanceof Leader);
1840 public void testIsolatedLeaderCheckNoVotingFollowers() {
1841 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1843 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1844 Follower follower = new Follower(followerActorContext);
1845 followerActor.underlyingActor().setBehavior(follower);
1847 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1848 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1849 new FiniteDuration(1000, TimeUnit.SECONDS));
1850 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1852 leader = new Leader(leaderActorContext);
1853 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1854 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855 assertTrue("Expected Leader", behavior instanceof Leader);
1858 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1859 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1860 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1862 MockRaftActorContext leaderActorContext = createActorContext();
1864 Map<String, String> peerAddresses = new HashMap<>();
1865 peerAddresses.put("follower-1", followerActor1.path().toString());
1866 peerAddresses.put("follower-2", followerActor2.path().toString());
1868 leaderActorContext.setPeerAddresses(peerAddresses);
1869 leaderActorContext.setRaftPolicy(raftPolicy);
1871 leader = new Leader(leaderActorContext);
1873 leader.markFollowerActive("follower-1");
1874 leader.markFollowerActive("follower-2");
1875 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1878 // kill 1 follower and verify if that got killed
1879 final JavaTestKit probe = new JavaTestKit(getSystem());
1880 probe.watch(followerActor1);
1881 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1882 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1883 assertEquals(termMsg1.getActor(), followerActor1);
1885 leader.markFollowerInActive("follower-1");
1886 leader.markFollowerActive("follower-2");
1887 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1890 // kill 2nd follower and leader should change to Isolated leader
1891 followerActor2.tell(PoisonPill.getInstance(), null);
1892 probe.watch(followerActor2);
1893 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1894 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1895 assertEquals(termMsg2.getActor(), followerActor2);
1897 leader.markFollowerInActive("follower-2");
1898 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1902 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1903 logStart("testIsolatedLeaderCheckTwoFollowers");
1905 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1907 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1908 behavior instanceof IsolatedLeader);
1912 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1913 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1915 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1917 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1918 behavior instanceof Leader);
1922 public void testLaggingFollowerStarvation() throws Exception {
1923 logStart("testLaggingFollowerStarvation");
1924 new JavaTestKit(getSystem()) {{
1925 String leaderActorId = actorFactory.generateActorId("leader");
1926 String follower1ActorId = actorFactory.generateActorId("follower");
1927 String follower2ActorId = actorFactory.generateActorId("follower");
1929 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1930 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1931 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1932 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1934 MockRaftActorContext leaderActorContext =
1935 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1937 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1938 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1939 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1941 leaderActorContext.setConfigParams(configParams);
1943 leaderActorContext.setReplicatedLog(
1944 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1946 Map<String, String> peerAddresses = new HashMap<>();
1947 peerAddresses.put(follower1ActorId,
1948 follower1Actor.path().toString());
1949 peerAddresses.put(follower2ActorId,
1950 follower2Actor.path().toString());
1952 leaderActorContext.setPeerAddresses(peerAddresses);
1953 leaderActorContext.getTermInformation().update(1, leaderActorId);
1955 RaftActorBehavior leader = createBehavior(leaderActorContext);
1957 leaderActor.underlyingActor().setBehavior(leader);
1959 for(int i=1;i<6;i++) {
1960 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1961 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1962 assertTrue(newBehavior == leader);
1963 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1966 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1967 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1969 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1970 heartbeats.size() > 1);
1972 // Check if follower-2 got AppendEntries during this time and was not starved
1973 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1975 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1976 appendEntries.size() > 1);
1982 public void testReplicationConsensusWithNonVotingFollower() {
1983 logStart("testReplicationConsensusWithNonVotingFollower");
1985 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1986 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1987 new FiniteDuration(1000, TimeUnit.SECONDS));
1989 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1991 String nonVotingFollowerId = "nonvoting-follower";
1992 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1995 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1997 leader = new Leader(leaderActorContext);
1998 leaderActorContext.setCurrentBehavior(leader);
2000 // Ignore initial heartbeats
2001 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2002 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2004 MessageCollectorActor.clearMessages(followerActor);
2005 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2006 MessageCollectorActor.clearMessages(leaderActor);
2008 // Send a Replicate message and wait for AppendEntries.
2009 sendReplicate(leaderActorContext, 0);
2011 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2014 // Send reply only from the voting follower and verify consensus via ApplyState.
2015 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2017 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2019 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2021 MessageCollectorActor.clearMessages(followerActor);
2022 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2023 MessageCollectorActor.clearMessages(leaderActor);
2025 // Send another Replicate message
2026 sendReplicate(leaderActorContext, 1);
2028 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2029 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2030 AppendEntries.class);
2031 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2032 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2034 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2035 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2037 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2039 // Send reply from the voting follower and verify consensus.
2040 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2042 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2046 public void testTransferLeadershipWithFollowerInSync() {
2047 logStart("testTransferLeadershipWithFollowerInSync");
2049 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051 new FiniteDuration(1000, TimeUnit.SECONDS));
2052 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054 leader = new Leader(leaderActorContext);
2055 leaderActorContext.setCurrentBehavior(leader);
2057 // Initial heartbeat
2058 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060 MessageCollectorActor.clearMessages(followerActor);
2062 sendReplicate(leaderActorContext, 0);
2063 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2065 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2066 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2067 MessageCollectorActor.clearMessages(followerActor);
2069 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070 leader.transferLeadership(mockTransferCohort);
2072 verify(mockTransferCohort, never()).transferComplete();
2073 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2076 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2079 // Leader should force an election timeout
2080 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2082 verify(mockTransferCohort).transferComplete();
2086 public void testTransferLeadershipWithEmptyLog() {
2087 logStart("testTransferLeadershipWithEmptyLog");
2089 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091 new FiniteDuration(1000, TimeUnit.SECONDS));
2092 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2094 leader = new Leader(leaderActorContext);
2095 leaderActorContext.setCurrentBehavior(leader);
2097 // Initial heartbeat
2098 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100 MessageCollectorActor.clearMessages(followerActor);
2102 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2103 leader.transferLeadership(mockTransferCohort);
2105 verify(mockTransferCohort, never()).transferComplete();
2106 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2107 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2109 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2110 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112 // Leader should force an election timeout
2113 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2115 verify(mockTransferCohort).transferComplete();
2119 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2120 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2122 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2123 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2124 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2126 leader = new Leader(leaderActorContext);
2127 leaderActorContext.setCurrentBehavior(leader);
2129 // Initial heartbeat
2130 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2131 MessageCollectorActor.clearMessages(followerActor);
2133 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2134 leader.transferLeadership(mockTransferCohort);
2136 verify(mockTransferCohort, never()).transferComplete();
2138 // Sync up the follower.
2139 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2141 MessageCollectorActor.clearMessages(followerActor);
2143 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2146 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2149 // Leader should force an election timeout
2150 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152 verify(mockTransferCohort).transferComplete();
2156 public void testTransferLeadershipWithFollowerSyncTimeout() {
2157 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2159 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2162 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2163 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2165 leader = new Leader(leaderActorContext);
2166 leaderActorContext.setCurrentBehavior(leader);
2168 // Initial heartbeat
2169 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2170 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2171 MessageCollectorActor.clearMessages(followerActor);
2173 sendReplicate(leaderActorContext, 0);
2174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2176 MessageCollectorActor.clearMessages(followerActor);
2178 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2179 leader.transferLeadership(mockTransferCohort);
2181 verify(mockTransferCohort, never()).transferComplete();
2183 // Send heartbeats to time out the transfer.
2184 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2185 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2186 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2187 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2190 verify(mockTransferCohort).abortTransfer();
2191 verify(mockTransferCohort, never()).transferComplete();
2192 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2196 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2197 ActorRef actorRef, RaftRPC rpc) throws Exception {
2198 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2199 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2202 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2204 private final long electionTimeOutIntervalMillis;
2205 private final int snapshotChunkSize;
2207 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2209 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2210 this.snapshotChunkSize = snapshotChunkSize;
2214 public FiniteDuration getElectionTimeOutInterval() {
2215 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2219 public int getSnapshotChunkSize() {
2220 return snapshotChunkSize;