2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.google.protobuf.ByteString;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
39 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
42 import org.opendaylight.controller.cluster.raft.RaftState;
43 import org.opendaylight.controller.cluster.raft.RaftVersions;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
71 static final String FOLLOWER_ID = "follower";
72 public static final String LEADER_ID = "leader";
74 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80 private Leader leader;
81 private final short payloadVersion = 5;
85 public void tearDown() throws Exception {
94 public void testHandleMessageForUnknownMessage() throws Exception {
95 logStart("testHandleMessageForUnknownMessage");
97 leader = new Leader(createActorContext());
99 // handle message should null when it receives an unknown message
100 assertNull(leader.handleMessage(followerActor, "foo"));
104 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107 MockRaftActorContext actorContext = createActorContextWithFollower();
108 actorContext.setCommitIndex(-1);
109 short payloadVersion = (short)5;
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
116 actorContext.setCurrentBehavior(leader);
118 // Leader should send an immediate heartbeat with no entries as follower is inactive.
119 long lastIndex = actorContext.getReplicatedLog().lastIndex();
120 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getTerm", term, appendEntries.getTerm());
122 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 0, appendEntries.getEntries().size());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127 // The follower would normally reply - simulate that explicitly here.
128 leader.handleMessage(followerActor, new AppendEntriesReply(
129 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
132 followerActor.underlyingActor().clear();
134 // Sleep for the heartbeat interval so AppendEntries is sent.
135 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
138 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
140 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143 assertEquals("Entries size", 1, appendEntries.getEntries().size());
144 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151 return sendReplicate(actorContext, 1, index);
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157 term, index, payload);
158 actorContext.getReplicatedLog().append(newEntry);
159 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
163 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
166 MockRaftActorContext actorContext = createActorContextWithFollower();
169 actorContext.getTermInformation().update(term, "");
171 leader = new Leader(actorContext);
173 // Leader will send an immediate heartbeat - ignore it.
174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
176 // The follower would normally reply - simulate that explicitly here.
177 long lastIndex = actorContext.getReplicatedLog().lastIndex();
178 leader.handleMessage(followerActor, new AppendEntriesReply(
179 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
182 followerActor.underlyingActor().clear();
184 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
186 // State should not change
187 assertTrue(raftBehavior instanceof Leader);
189 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192 assertEquals("Entries size", 1, appendEntries.getEntries().size());
193 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
200 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
203 MockRaftActorContext actorContext = createActorContextWithFollower();
204 actorContext.setCommitIndex(-1);
205 actorContext.setLastApplied(-1);
207 // The raft context is initialized with a couple log entries. However the commitIndex
208 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
209 // committed and applied. Now it regains leadership with a higher term (2).
210 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
211 long newTerm = prevTerm + 1;
212 actorContext.getTermInformation().update(newTerm, "");
214 leader = new Leader(actorContext);
215 actorContext.setCurrentBehavior(leader);
217 // Leader will send an immediate heartbeat - ignore it.
218 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
220 // The follower replies with the leader's current last index and term, simulating that it is
221 // up to date with the leader.
222 long lastIndex = actorContext.getReplicatedLog().lastIndex();
223 leader.handleMessage(followerActor, new AppendEntriesReply(
224 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
226 // The commit index should not get updated even though consensus was reached. This is b/c the
227 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
228 // from previous terms by counting replicas".
229 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
231 followerActor.underlyingActor().clear();
233 // Now replicate a new entry with the new term 2.
234 long newIndex = lastIndex + 1;
235 sendReplicate(actorContext, newTerm, newIndex);
237 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
238 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
239 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
240 assertEquals("Entries size", 1, appendEntries.getEntries().size());
241 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
242 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
243 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
245 // The follower replies with success. The leader should now update the commit index to the new index
246 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
247 // prior entries are committed indirectly".
248 leader.handleMessage(followerActor, new AppendEntriesReply(
249 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
251 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
255 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
256 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
258 MockRaftActorContext actorContext = createActorContextWithFollower();
259 actorContext.setRaftPolicy(createRaftPolicy(true, true));
262 actorContext.getTermInformation().update(term, "");
264 leader = new Leader(actorContext);
266 // Leader will send an immediate heartbeat - ignore it.
267 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
269 // The follower would normally reply - simulate that explicitly here.
270 long lastIndex = actorContext.getReplicatedLog().lastIndex();
271 leader.handleMessage(followerActor, new AppendEntriesReply(
272 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
273 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
275 followerActor.underlyingActor().clear();
277 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
279 // State should not change
280 assertTrue(raftBehavior instanceof Leader);
282 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
284 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
285 assertEquals("Entries size", 1, appendEntries.getEntries().size());
286 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
287 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
288 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
289 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
293 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
294 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
296 MockRaftActorContext actorContext = createActorContextWithFollower();
297 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
299 public FiniteDuration getHeartBeatInterval() {
300 return FiniteDuration.apply(5, TimeUnit.SECONDS);
305 actorContext.getTermInformation().update(term, "");
307 leader = new Leader(actorContext);
309 // Leader will send an immediate heartbeat - ignore it.
310 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
312 // The follower would normally reply - simulate that explicitly here.
313 long lastIndex = actorContext.getReplicatedLog().lastIndex();
314 leader.handleMessage(followerActor, new AppendEntriesReply(
315 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
316 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
318 followerActor.underlyingActor().clear();
320 for(int i=0;i<5;i++) {
321 sendReplicate(actorContext, lastIndex+i+1);
324 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
325 // We expect only 1 message to be sent because of two reasons,
326 // - an append entries reply was not received
327 // - the heartbeat interval has not expired
328 // In this scenario if multiple messages are sent they would likely be duplicates
329 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
333 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
334 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
336 MockRaftActorContext actorContext = createActorContextWithFollower();
337 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
339 public FiniteDuration getHeartBeatInterval() {
340 return FiniteDuration.apply(5, TimeUnit.SECONDS);
345 actorContext.getTermInformation().update(term, "");
347 leader = new Leader(actorContext);
349 // Leader will send an immediate heartbeat - ignore it.
350 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
352 // The follower would normally reply - simulate that explicitly here.
353 long lastIndex = actorContext.getReplicatedLog().lastIndex();
354 leader.handleMessage(followerActor, new AppendEntriesReply(
355 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
356 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
358 followerActor.underlyingActor().clear();
360 for(int i=0;i<3;i++) {
361 sendReplicate(actorContext, lastIndex+i+1);
362 leader.handleMessage(followerActor, new AppendEntriesReply(
363 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
367 for(int i=3;i<5;i++) {
368 sendReplicate(actorContext, lastIndex + i + 1);
371 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
372 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
373 // get sent to the follower - but not the 5th
374 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
376 for(int i=0;i<4;i++) {
377 long expected = allMessages.get(i).getEntries().get(0).getIndex();
378 assertEquals(expected, i+2);
383 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
384 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
386 MockRaftActorContext actorContext = createActorContextWithFollower();
387 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
389 public FiniteDuration getHeartBeatInterval() {
390 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
395 actorContext.getTermInformation().update(term, "");
397 leader = new Leader(actorContext);
399 // Leader will send an immediate heartbeat - ignore it.
400 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
402 // The follower would normally reply - simulate that explicitly here.
403 long lastIndex = actorContext.getReplicatedLog().lastIndex();
404 leader.handleMessage(followerActor, new AppendEntriesReply(
405 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
406 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
408 followerActor.underlyingActor().clear();
410 sendReplicate(actorContext, lastIndex+1);
412 // Wait slightly longer than heartbeat duration
413 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
415 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
417 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
418 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
420 assertEquals(1, allMessages.get(0).getEntries().size());
421 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422 assertEquals(1, allMessages.get(1).getEntries().size());
423 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
428 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
429 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
431 MockRaftActorContext actorContext = createActorContextWithFollower();
432 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
434 public FiniteDuration getHeartBeatInterval() {
435 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
440 actorContext.getTermInformation().update(term, "");
442 leader = new Leader(actorContext);
444 // Leader will send an immediate heartbeat - ignore it.
445 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
447 // The follower would normally reply - simulate that explicitly here.
448 long lastIndex = actorContext.getReplicatedLog().lastIndex();
449 leader.handleMessage(followerActor, new AppendEntriesReply(
450 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
451 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
453 followerActor.underlyingActor().clear();
455 for(int i=0;i<3;i++) {
456 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
457 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
460 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
461 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
465 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
466 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
468 MockRaftActorContext actorContext = createActorContextWithFollower();
469 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
471 public FiniteDuration getHeartBeatInterval() {
472 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
477 actorContext.getTermInformation().update(term, "");
479 leader = new Leader(actorContext);
481 // Leader will send an immediate heartbeat - ignore it.
482 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484 // The follower would normally reply - simulate that explicitly here.
485 long lastIndex = actorContext.getReplicatedLog().lastIndex();
486 leader.handleMessage(followerActor, new AppendEntriesReply(
487 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
490 followerActor.underlyingActor().clear();
492 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
493 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494 sendReplicate(actorContext, lastIndex+1);
496 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
497 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
499 assertEquals(0, allMessages.get(0).getEntries().size());
500 assertEquals(1, allMessages.get(1).getEntries().size());
505 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
506 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
508 MockRaftActorContext actorContext = createActorContext();
510 leader = new Leader(actorContext);
512 actorContext.setLastApplied(0);
514 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
515 long term = actorContext.getTermInformation().getCurrentTerm();
516 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
517 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
519 actorContext.getReplicatedLog().append(newEntry);
521 final Identifier id = new MockIdentifier("state-id");
522 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
524 // State should not change
525 assertTrue(raftBehavior instanceof Leader);
527 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
529 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
530 // one since lastApplied state is 0.
531 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
532 leaderActor, ApplyState.class);
533 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
535 for(int i = 0; i <= newLogIndex - 1; i++ ) {
536 ApplyState applyState = applyStateList.get(i);
537 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
538 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
541 ApplyState last = applyStateList.get((int) newLogIndex - 1);
542 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
543 assertEquals("getIdentifier", id, last.getIdentifier());
547 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
548 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
550 MockRaftActorContext actorContext = createActorContextWithFollower();
552 Map<String, String> leadersSnapshot = new HashMap<>();
553 leadersSnapshot.put("1", "A");
554 leadersSnapshot.put("2", "B");
555 leadersSnapshot.put("3", "C");
558 actorContext.getReplicatedLog().removeFrom(0);
560 final int commitIndex = 3;
561 final int snapshotIndex = 2;
562 final int newEntryIndex = 4;
563 final int snapshotTerm = 1;
564 final int currentTerm = 2;
566 // set the snapshot variables in replicatedlog
567 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569 actorContext.setCommitIndex(commitIndex);
570 //set follower timeout to 2 mins, helps during debugging
571 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
573 leader = new Leader(actorContext);
575 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
576 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
579 ReplicatedLogImplEntry entry =
580 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
581 new MockRaftActorContext.MockPayload("D"));
583 //update follower timestamp
584 leader.markFollowerActive(FOLLOWER_ID);
586 ByteString bs = toByteString(leadersSnapshot);
587 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
588 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
589 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
590 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
591 fts.setSnapshotBytes(bs);
592 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
594 //send first chunk and no InstallSnapshotReply received yet
596 fts.incrementChunkIndex();
598 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
599 TimeUnit.MILLISECONDS);
601 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
603 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
605 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
607 //InstallSnapshotReply received
608 fts.markSendStatus(true);
610 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
612 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
614 assertEquals(commitIndex, is.getLastIncludedIndex());
618 public void testSendAppendEntriesSnapshotScenario() throws Exception {
619 logStart("testSendAppendEntriesSnapshotScenario");
621 MockRaftActorContext actorContext = createActorContextWithFollower();
623 Map<String, String> leadersSnapshot = new HashMap<>();
624 leadersSnapshot.put("1", "A");
625 leadersSnapshot.put("2", "B");
626 leadersSnapshot.put("3", "C");
629 actorContext.getReplicatedLog().removeFrom(0);
631 final int followersLastIndex = 2;
632 final int snapshotIndex = 3;
633 final int newEntryIndex = 4;
634 final int snapshotTerm = 1;
635 final int currentTerm = 2;
637 // set the snapshot variables in replicatedlog
638 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640 actorContext.setCommitIndex(followersLastIndex);
642 leader = new Leader(actorContext);
644 // Leader will send an immediate heartbeat - ignore it.
645 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
648 ReplicatedLogImplEntry entry =
649 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
650 new MockRaftActorContext.MockPayload("D"));
652 actorContext.getReplicatedLog().append(entry);
654 //update follower timestamp
655 leader.markFollowerActive(FOLLOWER_ID);
657 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
658 RaftActorBehavior raftBehavior = leader.handleMessage(
659 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
661 assertTrue(raftBehavior instanceof Leader);
663 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
667 public void testInitiateInstallSnapshot() throws Exception {
668 logStart("testInitiateInstallSnapshot");
670 MockRaftActorContext actorContext = createActorContextWithFollower();
673 actorContext.getReplicatedLog().removeFrom(0);
675 final int followersLastIndex = 2;
676 final int snapshotIndex = 3;
677 final int newEntryIndex = 4;
678 final int snapshotTerm = 1;
679 final int currentTerm = 2;
681 // set the snapshot variables in replicatedlog
682 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
683 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
684 actorContext.setLastApplied(3);
685 actorContext.setCommitIndex(followersLastIndex);
687 leader = new Leader(actorContext);
689 // Leader will send an immediate heartbeat - ignore it.
690 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
692 // set the snapshot as absent and check if capture-snapshot is invoked.
693 leader.setSnapshot(null);
696 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
697 new MockRaftActorContext.MockPayload("D"));
699 actorContext.getReplicatedLog().append(entry);
701 //update follower timestamp
702 leader.markFollowerActive(FOLLOWER_ID);
704 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
706 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
708 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
710 assertTrue(cs.isInstallSnapshotInitiated());
711 assertEquals(3, cs.getLastAppliedIndex());
712 assertEquals(1, cs.getLastAppliedTerm());
713 assertEquals(4, cs.getLastIndex());
714 assertEquals(2, cs.getLastTerm());
716 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
717 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
719 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
723 public void testInitiateForceInstallSnapshot() throws Exception {
724 logStart("testInitiateForceInstallSnapshot");
726 MockRaftActorContext actorContext = createActorContextWithFollower();
728 final int followersLastIndex = 2;
729 final int snapshotIndex = -1;
730 final int newEntryIndex = 4;
731 final int snapshotTerm = -1;
732 final int currentTerm = 2;
734 // set the snapshot variables in replicatedlog
735 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
736 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
737 actorContext.setLastApplied(3);
738 actorContext.setCommitIndex(followersLastIndex);
740 actorContext.getReplicatedLog().removeFrom(0);
742 leader = new Leader(actorContext);
743 actorContext.setCurrentBehavior(leader);
745 // Leader will send an immediate heartbeat - ignore it.
746 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
748 // set the snapshot as absent and check if capture-snapshot is invoked.
749 leader.setSnapshot(null);
751 for(int i=0;i<4;i++) {
752 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
753 new MockRaftActorContext.MockPayload("X" + i)));
757 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
758 new MockRaftActorContext.MockPayload("D"));
760 actorContext.getReplicatedLog().append(entry);
762 //update follower timestamp
763 leader.markFollowerActive(FOLLOWER_ID);
765 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
766 // installed with a SendInstallSnapshot
767 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
769 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
771 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
773 assertTrue(cs.isInstallSnapshotInitiated());
774 assertEquals(3, cs.getLastAppliedIndex());
775 assertEquals(1, cs.getLastAppliedTerm());
776 assertEquals(4, cs.getLastIndex());
777 assertEquals(2, cs.getLastTerm());
779 // if an initiate is started again when first is in progress, it should not initiate Capture
780 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
782 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
787 public void testInstallSnapshot() throws Exception {
788 logStart("testInstallSnapshot");
790 MockRaftActorContext actorContext = createActorContextWithFollower();
792 Map<String, String> leadersSnapshot = new HashMap<>();
793 leadersSnapshot.put("1", "A");
794 leadersSnapshot.put("2", "B");
795 leadersSnapshot.put("3", "C");
798 actorContext.getReplicatedLog().removeFrom(0);
800 final int lastAppliedIndex = 3;
801 final int snapshotIndex = 2;
802 final int snapshotTerm = 1;
803 final int currentTerm = 2;
805 // set the snapshot variables in replicatedlog
806 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
807 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
808 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
809 actorContext.setCommitIndex(lastAppliedIndex);
810 actorContext.setLastApplied(lastAppliedIndex);
812 leader = new Leader(actorContext);
814 // Initial heartbeat.
815 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
817 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
818 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
820 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
821 Collections.<ReplicatedLogEntry>emptyList(),
822 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
824 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
826 assertTrue(raftBehavior instanceof Leader);
828 // check if installsnapshot gets called with the correct values.
830 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
832 assertNotNull(installSnapshot.getData());
833 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
834 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
836 assertEquals(currentTerm, installSnapshot.getTerm());
840 public void testForceInstallSnapshot() throws Exception {
841 logStart("testForceInstallSnapshot");
843 MockRaftActorContext actorContext = createActorContextWithFollower();
845 Map<String, String> leadersSnapshot = new HashMap<>();
846 leadersSnapshot.put("1", "A");
847 leadersSnapshot.put("2", "B");
848 leadersSnapshot.put("3", "C");
850 final int lastAppliedIndex = 3;
851 final int snapshotIndex = -1;
852 final int snapshotTerm = -1;
853 final int currentTerm = 2;
855 // set the snapshot variables in replicatedlog
856 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
857 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
858 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
859 actorContext.setCommitIndex(lastAppliedIndex);
860 actorContext.setLastApplied(lastAppliedIndex);
862 leader = new Leader(actorContext);
864 // Initial heartbeat.
865 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
867 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
868 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
870 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
871 Collections.<ReplicatedLogEntry>emptyList(),
872 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
874 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
876 assertTrue(raftBehavior instanceof Leader);
878 // check if installsnapshot gets called with the correct values.
880 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
882 assertNotNull(installSnapshot.getData());
883 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
886 assertEquals(currentTerm, installSnapshot.getTerm());
890 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
891 logStart("testHandleInstallSnapshotReplyLastChunk");
893 MockRaftActorContext actorContext = createActorContextWithFollower();
895 final int commitIndex = 3;
896 final int snapshotIndex = 2;
897 final int snapshotTerm = 1;
898 final int currentTerm = 2;
900 actorContext.setCommitIndex(commitIndex);
902 leader = new Leader(actorContext);
903 actorContext.setCurrentBehavior(leader);
905 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
906 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
908 // Ignore initial heartbeat.
909 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
911 Map<String, String> leadersSnapshot = new HashMap<>();
912 leadersSnapshot.put("1", "A");
913 leadersSnapshot.put("2", "B");
914 leadersSnapshot.put("3", "C");
916 // set the snapshot variables in replicatedlog
918 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
922 ByteString bs = toByteString(leadersSnapshot);
923 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
924 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
925 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
926 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
927 fts.setSnapshotBytes(bs);
928 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
929 while(!fts.isLastChunk(fts.getChunkIndex())) {
931 fts.incrementChunkIndex();
935 actorContext.getReplicatedLog().removeFrom(0);
937 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
938 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
940 assertTrue(raftBehavior instanceof Leader);
942 assertEquals(1, leader.followerLogSize());
943 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
945 assertNull(fli.getInstallSnapshotState());
946 assertEquals(commitIndex, fli.getMatchIndex());
947 assertEquals(commitIndex + 1, fli.getNextIndex());
948 assertFalse(leader.hasSnapshot());
952 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
953 logStart("testSendSnapshotfromInstallSnapshotReply");
955 MockRaftActorContext actorContext = createActorContextWithFollower();
957 final int commitIndex = 3;
958 final int snapshotIndex = 2;
959 final int snapshotTerm = 1;
960 final int currentTerm = 2;
962 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
964 public int getSnapshotChunkSize() {
968 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
969 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
971 actorContext.setConfigParams(configParams);
972 actorContext.setCommitIndex(commitIndex);
974 leader = new Leader(actorContext);
975 actorContext.setCurrentBehavior(leader);
977 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
980 Map<String, String> leadersSnapshot = new HashMap<>();
981 leadersSnapshot.put("1", "A");
982 leadersSnapshot.put("2", "B");
983 leadersSnapshot.put("3", "C");
985 // set the snapshot variables in replicatedlog
986 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
990 ByteString bs = toByteString(leadersSnapshot);
991 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
992 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
993 leader.setSnapshot(snapshot);
995 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
997 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
999 assertEquals(1, installSnapshot.getChunkIndex());
1000 assertEquals(3, installSnapshot.getTotalChunks());
1002 followerActor.underlyingActor().clear();
1003 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1004 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1006 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1008 assertEquals(2, installSnapshot.getChunkIndex());
1009 assertEquals(3, installSnapshot.getTotalChunks());
1011 followerActor.underlyingActor().clear();
1012 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1013 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1015 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1017 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1018 followerActor.underlyingActor().clear();
1019 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1020 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1022 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1024 assertNull(installSnapshot);
1029 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1030 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1032 MockRaftActorContext actorContext = createActorContextWithFollower();
1034 final int commitIndex = 3;
1035 final int snapshotIndex = 2;
1036 final int snapshotTerm = 1;
1037 final int currentTerm = 2;
1039 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1041 public int getSnapshotChunkSize() {
1046 actorContext.setCommitIndex(commitIndex);
1048 leader = new Leader(actorContext);
1050 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1051 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1053 Map<String, String> leadersSnapshot = new HashMap<>();
1054 leadersSnapshot.put("1", "A");
1055 leadersSnapshot.put("2", "B");
1056 leadersSnapshot.put("3", "C");
1058 // set the snapshot variables in replicatedlog
1059 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1060 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1061 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1063 ByteString bs = toByteString(leadersSnapshot);
1064 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1065 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1066 leader.setSnapshot(snapshot);
1068 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1069 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1071 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1073 assertEquals(1, installSnapshot.getChunkIndex());
1074 assertEquals(3, installSnapshot.getTotalChunks());
1076 followerActor.underlyingActor().clear();
1078 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1079 FOLLOWER_ID, -1, false));
1081 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1082 TimeUnit.MILLISECONDS);
1084 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1086 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1088 assertEquals(1, installSnapshot.getChunkIndex());
1089 assertEquals(3, installSnapshot.getTotalChunks());
1093 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1094 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1096 MockRaftActorContext actorContext = createActorContextWithFollower();
1098 final int commitIndex = 3;
1099 final int snapshotIndex = 2;
1100 final int snapshotTerm = 1;
1101 final int currentTerm = 2;
1103 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1105 public int getSnapshotChunkSize() {
1110 actorContext.setCommitIndex(commitIndex);
1112 leader = new Leader(actorContext);
1114 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1115 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1117 Map<String, String> leadersSnapshot = new HashMap<>();
1118 leadersSnapshot.put("1", "A");
1119 leadersSnapshot.put("2", "B");
1120 leadersSnapshot.put("3", "C");
1122 // set the snapshot variables in replicatedlog
1123 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1124 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1125 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1127 ByteString bs = toByteString(leadersSnapshot);
1128 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1129 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1130 leader.setSnapshot(snapshot);
1132 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1134 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1136 assertEquals(1, installSnapshot.getChunkIndex());
1137 assertEquals(3, installSnapshot.getTotalChunks());
1138 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1139 installSnapshot.getLastChunkHashCode().get().intValue());
1141 int hashCode = Arrays.hashCode(installSnapshot.getData());
1143 followerActor.underlyingActor().clear();
1145 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1146 FOLLOWER_ID, 1, true));
1148 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1150 assertEquals(2, installSnapshot.getChunkIndex());
1151 assertEquals(3, installSnapshot.getTotalChunks());
1152 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1156 public void testLeaderInstallSnapshotState() {
1157 logStart("testLeaderInstallSnapshotState");
1159 Map<String, String> leadersSnapshot = new HashMap<>();
1160 leadersSnapshot.put("1", "A");
1161 leadersSnapshot.put("2", "B");
1162 leadersSnapshot.put("3", "C");
1164 ByteString bs = toByteString(leadersSnapshot);
1165 byte[] barray = bs.toByteArray();
1167 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1168 fts.setSnapshotBytes(bs);
1170 assertEquals(bs.size(), barray.length);
1173 for (int i=0; i < barray.length; i = i + 50) {
1177 if (i + 50 > barray.length) {
1181 byte[] chunk = fts.getNextChunk();
1182 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1183 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1185 fts.markSendStatus(true);
1186 if (!fts.isLastChunk(chunkIndex)) {
1187 fts.incrementChunkIndex();
1191 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1195 protected Leader createBehavior(final RaftActorContext actorContext) {
1196 return new Leader(actorContext);
1200 protected MockRaftActorContext createActorContext() {
1201 return createActorContext(leaderActor);
1205 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1206 return createActorContext(LEADER_ID, actorRef);
1209 private MockRaftActorContext createActorContextWithFollower() {
1210 MockRaftActorContext actorContext = createActorContext();
1211 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1212 followerActor.path().toString()).build());
1213 return actorContext;
1216 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219 configParams.setElectionTimeoutFactor(100000);
1220 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221 context.setConfigParams(configParams);
1222 context.setPayloadVersion(payloadVersion);
1226 private MockRaftActorContext createFollowerActorContextWithLeader() {
1227 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1228 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1229 followerConfig.setElectionTimeoutFactor(10000);
1230 followerActorContext.setConfigParams(followerConfig);
1231 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1232 return followerActorContext;
1236 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1237 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1239 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1241 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1243 Follower follower = new Follower(followerActorContext);
1244 followerActor.underlyingActor().setBehavior(follower);
1245 followerActorContext.setCurrentBehavior(follower);
1247 Map<String, String> peerAddresses = new HashMap<>();
1248 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1250 leaderActorContext.setPeerAddresses(peerAddresses);
1252 leaderActorContext.getReplicatedLog().removeFrom(0);
1255 leaderActorContext.setReplicatedLog(
1256 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1258 leaderActorContext.setCommitIndex(1);
1260 followerActorContext.getReplicatedLog().removeFrom(0);
1262 // follower too has the exact same log entries and has the same commit index
1263 followerActorContext.setReplicatedLog(
1264 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1266 followerActorContext.setCommitIndex(1);
1268 leader = new Leader(leaderActorContext);
1269 leaderActorContext.setCurrentBehavior(leader);
1271 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1273 assertEquals(-1, appendEntries.getLeaderCommit());
1274 assertEquals(0, appendEntries.getEntries().size());
1275 assertEquals(0, appendEntries.getPrevLogIndex());
1277 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1278 leaderActor, AppendEntriesReply.class);
1280 assertEquals(2, appendEntriesReply.getLogLastIndex());
1281 assertEquals(1, appendEntriesReply.getLogLastTerm());
1283 // follower returns its next index
1284 assertEquals(2, appendEntriesReply.getLogLastIndex());
1285 assertEquals(1, appendEntriesReply.getLogLastTerm());
1291 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1292 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1294 MockRaftActorContext leaderActorContext = createActorContext();
1296 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1297 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1299 Follower follower = new Follower(followerActorContext);
1300 followerActor.underlyingActor().setBehavior(follower);
1301 followerActorContext.setCurrentBehavior(follower);
1303 Map<String, String> leaderPeerAddresses = new HashMap<>();
1304 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1306 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1308 leaderActorContext.getReplicatedLog().removeFrom(0);
1310 leaderActorContext.setReplicatedLog(
1311 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1313 leaderActorContext.setCommitIndex(1);
1315 followerActorContext.getReplicatedLog().removeFrom(0);
1317 followerActorContext.setReplicatedLog(
1318 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1320 // follower has the same log entries but its commit index > leaders commit index
1321 followerActorContext.setCommitIndex(2);
1323 leader = new Leader(leaderActorContext);
1325 // Initial heartbeat
1326 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328 assertEquals(-1, appendEntries.getLeaderCommit());
1329 assertEquals(0, appendEntries.getEntries().size());
1330 assertEquals(0, appendEntries.getPrevLogIndex());
1332 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1333 leaderActor, AppendEntriesReply.class);
1335 assertEquals(2, appendEntriesReply.getLogLastIndex());
1336 assertEquals(1, appendEntriesReply.getLogLastTerm());
1338 leaderActor.underlyingActor().setBehavior(follower);
1339 leader.handleMessage(followerActor, appendEntriesReply);
1341 leaderActor.underlyingActor().clear();
1342 followerActor.underlyingActor().clear();
1344 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1345 TimeUnit.MILLISECONDS);
1347 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1349 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1351 assertEquals(2, appendEntries.getLeaderCommit());
1352 assertEquals(0, appendEntries.getEntries().size());
1353 assertEquals(2, appendEntries.getPrevLogIndex());
1355 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1357 assertEquals(2, appendEntriesReply.getLogLastIndex());
1358 assertEquals(1, appendEntriesReply.getLogLastTerm());
1360 assertEquals(2, followerActorContext.getCommitIndex());
1366 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1367 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1369 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1370 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1371 new FiniteDuration(1000, TimeUnit.SECONDS));
1373 leaderActorContext.setReplicatedLog(
1374 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1375 long leaderCommitIndex = 2;
1376 leaderActorContext.setCommitIndex(leaderCommitIndex);
1377 leaderActorContext.setLastApplied(leaderCommitIndex);
1379 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1380 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1382 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1384 followerActorContext.setReplicatedLog(
1385 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1386 followerActorContext.setCommitIndex(0);
1387 followerActorContext.setLastApplied(0);
1389 Follower follower = new Follower(followerActorContext);
1390 followerActor.underlyingActor().setBehavior(follower);
1392 leader = new Leader(leaderActorContext);
1394 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1395 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1397 MessageCollectorActor.clearMessages(followerActor);
1398 MessageCollectorActor.clearMessages(leaderActor);
1400 // Verify initial AppendEntries sent.
1401 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1402 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1403 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1405 leaderActor.underlyingActor().setBehavior(leader);
1407 leader.handleMessage(followerActor, appendEntriesReply);
1409 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1410 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1412 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1413 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1414 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1416 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1417 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1418 appendEntries.getEntries().get(0).getData());
1419 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1420 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1421 appendEntries.getEntries().get(1).getData());
1423 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1424 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1426 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1428 ApplyState applyState = applyStateList.get(0);
1429 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1430 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1431 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1432 applyState.getReplicatedLogEntry().getData());
1434 applyState = applyStateList.get(1);
1435 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1436 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1438 applyState.getReplicatedLogEntry().getData());
1440 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1441 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1445 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1446 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1448 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1449 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1450 new FiniteDuration(1000, TimeUnit.SECONDS));
1452 leaderActorContext.setReplicatedLog(
1453 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1454 long leaderCommitIndex = 1;
1455 leaderActorContext.setCommitIndex(leaderCommitIndex);
1456 leaderActorContext.setLastApplied(leaderCommitIndex);
1458 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1459 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1461 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1463 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1464 followerActorContext.setCommitIndex(-1);
1465 followerActorContext.setLastApplied(-1);
1467 Follower follower = new Follower(followerActorContext);
1468 followerActor.underlyingActor().setBehavior(follower);
1469 followerActorContext.setCurrentBehavior(follower);
1471 leader = new Leader(leaderActorContext);
1473 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1474 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1476 MessageCollectorActor.clearMessages(followerActor);
1477 MessageCollectorActor.clearMessages(leaderActor);
1479 // Verify initial AppendEntries sent with the leader's current commit index.
1480 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1481 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1482 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1484 leaderActor.underlyingActor().setBehavior(leader);
1485 leaderActorContext.setCurrentBehavior(leader);
1487 leader.handleMessage(followerActor, appendEntriesReply);
1489 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1490 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1492 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1494 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1496 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1497 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1498 appendEntries.getEntries().get(0).getData());
1499 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1500 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1501 appendEntries.getEntries().get(1).getData());
1503 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1504 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1506 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1508 ApplyState applyState = applyStateList.get(0);
1509 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1510 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1512 applyState.getReplicatedLogEntry().getData());
1514 applyState = applyStateList.get(1);
1515 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1516 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1518 applyState.getReplicatedLogEntry().getData());
1520 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1521 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1525 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1526 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1528 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1529 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1530 new FiniteDuration(1000, TimeUnit.SECONDS));
1532 leaderActorContext.setReplicatedLog(
1533 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1534 long leaderCommitIndex = 1;
1535 leaderActorContext.setCommitIndex(leaderCommitIndex);
1536 leaderActorContext.setLastApplied(leaderCommitIndex);
1538 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1539 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1541 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1543 followerActorContext.setReplicatedLog(
1544 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1545 followerActorContext.setCommitIndex(-1);
1546 followerActorContext.setLastApplied(-1);
1548 Follower follower = new Follower(followerActorContext);
1549 followerActor.underlyingActor().setBehavior(follower);
1550 followerActorContext.setCurrentBehavior(follower);
1552 leader = new Leader(leaderActorContext);
1554 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1555 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1557 MessageCollectorActor.clearMessages(followerActor);
1558 MessageCollectorActor.clearMessages(leaderActor);
1560 // Verify initial AppendEntries sent with the leader's current commit index.
1561 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1562 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1563 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1565 leaderActor.underlyingActor().setBehavior(leader);
1566 leaderActorContext.setCurrentBehavior(leader);
1568 leader.handleMessage(followerActor, appendEntriesReply);
1570 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1571 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1573 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1574 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1575 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1577 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1578 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1579 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1580 appendEntries.getEntries().get(0).getData());
1581 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1582 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1583 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1584 appendEntries.getEntries().get(1).getData());
1586 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1587 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1589 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1591 ApplyState applyState = applyStateList.get(0);
1592 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1593 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1594 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1595 applyState.getReplicatedLogEntry().getData());
1597 applyState = applyStateList.get(1);
1598 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1599 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1601 applyState.getReplicatedLogEntry().getData());
1603 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1604 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1605 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1609 public void testHandleAppendEntriesReplyWithNewerTerm(){
1610 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1612 MockRaftActorContext leaderActorContext = createActorContext();
1613 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1614 new FiniteDuration(10000, TimeUnit.SECONDS));
1616 leaderActorContext.setReplicatedLog(
1617 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1619 leader = new Leader(leaderActorContext);
1620 leaderActor.underlyingActor().setBehavior(leader);
1621 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1623 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1625 assertEquals(false, appendEntriesReply.isSuccess());
1626 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1628 MessageCollectorActor.clearMessages(leaderActor);
1632 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1633 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1635 MockRaftActorContext leaderActorContext = createActorContext();
1636 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1637 new FiniteDuration(10000, TimeUnit.SECONDS));
1639 leaderActorContext.setReplicatedLog(
1640 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1641 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1643 leader = new Leader(leaderActorContext);
1644 leaderActor.underlyingActor().setBehavior(leader);
1645 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1647 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1649 assertEquals(false, appendEntriesReply.isSuccess());
1650 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1652 MessageCollectorActor.clearMessages(leaderActor);
1656 public void testHandleAppendEntriesReplySuccess() throws Exception {
1657 logStart("testHandleAppendEntriesReplySuccess");
1659 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1661 leaderActorContext.setReplicatedLog(
1662 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1664 leaderActorContext.setCommitIndex(1);
1665 leaderActorContext.setLastApplied(1);
1666 leaderActorContext.getTermInformation().update(1, "leader");
1668 leader = new Leader(leaderActorContext);
1670 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1672 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1673 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1675 short payloadVersion = 5;
1676 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1678 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1680 assertEquals(RaftState.Leader, raftActorBehavior.state());
1682 assertEquals(2, leaderActorContext.getCommitIndex());
1684 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1685 leaderActor, ApplyJournalEntries.class);
1687 assertEquals(2, leaderActorContext.getLastApplied());
1689 assertEquals(2, applyJournalEntries.getToIndex());
1691 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1694 assertEquals(1,applyStateList.size());
1696 ApplyState applyState = applyStateList.get(0);
1698 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1700 assertEquals(2, followerInfo.getMatchIndex());
1701 assertEquals(3, followerInfo.getNextIndex());
1702 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1703 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1707 public void testHandleAppendEntriesReplyUnknownFollower(){
1708 logStart("testHandleAppendEntriesReplyUnknownFollower");
1710 MockRaftActorContext leaderActorContext = createActorContext();
1712 leader = new Leader(leaderActorContext);
1714 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1716 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1718 assertEquals(RaftState.Leader, raftActorBehavior.state());
1722 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1723 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1725 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1726 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1727 new FiniteDuration(1000, TimeUnit.SECONDS));
1728 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1730 leaderActorContext.setReplicatedLog(
1731 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1732 long leaderCommitIndex = 3;
1733 leaderActorContext.setCommitIndex(leaderCommitIndex);
1734 leaderActorContext.setLastApplied(leaderCommitIndex);
1736 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1737 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1738 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1739 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1741 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1743 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1744 followerActorContext.setCommitIndex(-1);
1745 followerActorContext.setLastApplied(-1);
1747 Follower follower = new Follower(followerActorContext);
1748 followerActor.underlyingActor().setBehavior(follower);
1749 followerActorContext.setCurrentBehavior(follower);
1751 leader = new Leader(leaderActorContext);
1753 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1754 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1756 MessageCollectorActor.clearMessages(followerActor);
1757 MessageCollectorActor.clearMessages(leaderActor);
1759 // Verify initial AppendEntries sent with the leader's current commit index.
1760 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1761 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1762 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1764 leaderActor.underlyingActor().setBehavior(leader);
1765 leaderActorContext.setCurrentBehavior(leader);
1767 leader.handleMessage(followerActor, appendEntriesReply);
1769 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1770 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1772 appendEntries = appendEntriesList.get(0);
1773 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1774 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1775 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1777 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1778 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1779 appendEntries.getEntries().get(0).getData());
1780 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1781 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1782 appendEntries.getEntries().get(1).getData());
1784 appendEntries = appendEntriesList.get(1);
1785 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1786 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1787 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1789 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1790 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1791 appendEntries.getEntries().get(0).getData());
1792 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1793 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1794 appendEntries.getEntries().get(1).getData());
1796 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1797 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1799 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1801 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1802 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1806 public void testHandleRequestVoteReply(){
1807 logStart("testHandleRequestVoteReply");
1809 MockRaftActorContext leaderActorContext = createActorContext();
1811 leader = new Leader(leaderActorContext);
1813 // Should be a no-op.
1814 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1815 new RequestVoteReply(1, true));
1817 assertEquals(RaftState.Leader, raftActorBehavior.state());
1819 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1821 assertEquals(RaftState.Leader, raftActorBehavior.state());
1825 public void testIsolatedLeaderCheckNoFollowers() {
1826 logStart("testIsolatedLeaderCheckNoFollowers");
1828 MockRaftActorContext leaderActorContext = createActorContext();
1830 leader = new Leader(leaderActorContext);
1831 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1832 assertTrue(behavior instanceof Leader);
1836 public void testIsolatedLeaderCheckNoVotingFollowers() {
1837 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1839 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1840 Follower follower = new Follower(followerActorContext);
1841 followerActor.underlyingActor().setBehavior(follower);
1843 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1844 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1845 new FiniteDuration(1000, TimeUnit.SECONDS));
1846 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1848 leader = new Leader(leaderActorContext);
1849 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1850 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1851 assertTrue("Expected Leader", behavior instanceof Leader);
1854 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1855 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1856 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1858 MockRaftActorContext leaderActorContext = createActorContext();
1860 Map<String, String> peerAddresses = new HashMap<>();
1861 peerAddresses.put("follower-1", followerActor1.path().toString());
1862 peerAddresses.put("follower-2", followerActor2.path().toString());
1864 leaderActorContext.setPeerAddresses(peerAddresses);
1865 leaderActorContext.setRaftPolicy(raftPolicy);
1867 leader = new Leader(leaderActorContext);
1869 leader.markFollowerActive("follower-1");
1870 leader.markFollowerActive("follower-2");
1871 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1872 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1874 // kill 1 follower and verify if that got killed
1875 final JavaTestKit probe = new JavaTestKit(getSystem());
1876 probe.watch(followerActor1);
1877 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1878 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1879 assertEquals(termMsg1.getActor(), followerActor1);
1881 leader.markFollowerInActive("follower-1");
1882 leader.markFollowerActive("follower-2");
1883 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1884 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1886 // kill 2nd follower and leader should change to Isolated leader
1887 followerActor2.tell(PoisonPill.getInstance(), null);
1888 probe.watch(followerActor2);
1889 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1890 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1891 assertEquals(termMsg2.getActor(), followerActor2);
1893 leader.markFollowerInActive("follower-2");
1894 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1898 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1899 logStart("testIsolatedLeaderCheckTwoFollowers");
1901 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1903 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1904 behavior instanceof IsolatedLeader);
1908 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1909 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1911 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1913 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1914 behavior instanceof Leader);
1918 public void testLaggingFollowerStarvation() throws Exception {
1919 logStart("testLaggingFollowerStarvation");
1920 new JavaTestKit(getSystem()) {{
1921 String leaderActorId = actorFactory.generateActorId("leader");
1922 String follower1ActorId = actorFactory.generateActorId("follower");
1923 String follower2ActorId = actorFactory.generateActorId("follower");
1925 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1926 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1927 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1928 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1930 MockRaftActorContext leaderActorContext =
1931 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1933 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1934 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1935 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1937 leaderActorContext.setConfigParams(configParams);
1939 leaderActorContext.setReplicatedLog(
1940 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1942 Map<String, String> peerAddresses = new HashMap<>();
1943 peerAddresses.put(follower1ActorId,
1944 follower1Actor.path().toString());
1945 peerAddresses.put(follower2ActorId,
1946 follower2Actor.path().toString());
1948 leaderActorContext.setPeerAddresses(peerAddresses);
1949 leaderActorContext.getTermInformation().update(1, leaderActorId);
1951 RaftActorBehavior leader = createBehavior(leaderActorContext);
1953 leaderActor.underlyingActor().setBehavior(leader);
1955 for(int i=1;i<6;i++) {
1956 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1957 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1958 assertTrue(newBehavior == leader);
1959 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1962 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1963 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1965 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1966 heartbeats.size() > 1);
1968 // Check if follower-2 got AppendEntries during this time and was not starved
1969 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1971 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1972 appendEntries.size() > 1);
1978 public void testReplicationConsensusWithNonVotingFollower() {
1979 logStart("testReplicationConsensusWithNonVotingFollower");
1981 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1982 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1983 new FiniteDuration(1000, TimeUnit.SECONDS));
1985 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1986 leaderActorContext.setCommitIndex(-1);
1987 leaderActorContext.setLastApplied(-1);
1989 String nonVotingFollowerId = "nonvoting-follower";
1990 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1991 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1993 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1995 leader = new Leader(leaderActorContext);
1996 leaderActorContext.setCurrentBehavior(leader);
1998 // Ignore initial heartbeats
1999 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2000 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2002 MessageCollectorActor.clearMessages(followerActor);
2003 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2004 MessageCollectorActor.clearMessages(leaderActor);
2006 // Send a Replicate message and wait for AppendEntries.
2007 sendReplicate(leaderActorContext, 0);
2009 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2010 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2012 // Send reply only from the voting follower and verify consensus via ApplyState.
2013 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2015 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2017 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2019 MessageCollectorActor.clearMessages(followerActor);
2020 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2021 MessageCollectorActor.clearMessages(leaderActor);
2023 // Send another Replicate message
2024 sendReplicate(leaderActorContext, 1);
2026 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2027 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2028 AppendEntries.class);
2029 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2030 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2032 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2033 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2035 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2037 // Send reply from the voting follower and verify consensus.
2038 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2040 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2044 public void testTransferLeadershipWithFollowerInSync() {
2045 logStart("testTransferLeadershipWithFollowerInSync");
2047 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2048 leaderActorContext.setLastApplied(-1);
2049 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2050 new FiniteDuration(1000, TimeUnit.SECONDS));
2051 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2053 leader = new Leader(leaderActorContext);
2054 leaderActorContext.setCurrentBehavior(leader);
2056 // Initial heartbeat
2057 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2059 MessageCollectorActor.clearMessages(followerActor);
2061 sendReplicate(leaderActorContext, 0);
2062 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2064 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2065 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2066 MessageCollectorActor.clearMessages(followerActor);
2068 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2069 leader.transferLeadership(mockTransferCohort);
2071 verify(mockTransferCohort, never()).transferComplete();
2072 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2075 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2076 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2078 // Leader should force an election timeout
2079 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2081 verify(mockTransferCohort).transferComplete();
2085 public void testTransferLeadershipWithEmptyLog() {
2086 logStart("testTransferLeadershipWithEmptyLog");
2088 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2089 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2090 new FiniteDuration(1000, TimeUnit.SECONDS));
2091 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2093 leader = new Leader(leaderActorContext);
2094 leaderActorContext.setCurrentBehavior(leader);
2096 // Initial heartbeat
2097 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2098 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2099 MessageCollectorActor.clearMessages(followerActor);
2101 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2102 leader.transferLeadership(mockTransferCohort);
2104 verify(mockTransferCohort, never()).transferComplete();
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2108 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2109 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111 // Leader should force an election timeout
2112 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2114 verify(mockTransferCohort).transferComplete();
2118 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2119 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2121 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2122 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2123 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2125 leader = new Leader(leaderActorContext);
2126 leaderActorContext.setCurrentBehavior(leader);
2128 // Initial heartbeat
2129 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2130 MessageCollectorActor.clearMessages(followerActor);
2132 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2133 leader.transferLeadership(mockTransferCohort);
2135 verify(mockTransferCohort, never()).transferComplete();
2137 // Sync up the follower.
2138 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2139 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2140 MessageCollectorActor.clearMessages(followerActor);
2142 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2143 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2144 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2145 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2146 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2148 // Leader should force an election timeout
2149 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151 verify(mockTransferCohort).transferComplete();
2155 public void testTransferLeadershipWithFollowerSyncTimeout() {
2156 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2158 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2159 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2160 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2161 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2162 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2164 leader = new Leader(leaderActorContext);
2165 leaderActorContext.setCurrentBehavior(leader);
2167 // Initial heartbeat
2168 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170 MessageCollectorActor.clearMessages(followerActor);
2172 sendReplicate(leaderActorContext, 0);
2173 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2175 MessageCollectorActor.clearMessages(followerActor);
2177 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2178 leader.transferLeadership(mockTransferCohort);
2180 verify(mockTransferCohort, never()).transferComplete();
2182 // Send heartbeats to time out the transfer.
2183 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2184 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2185 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2186 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2189 verify(mockTransferCohort).abortTransfer();
2190 verify(mockTransferCohort, never()).transferComplete();
2191 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2195 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2196 ActorRef actorRef, RaftRPC rpc) throws Exception {
2197 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2198 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2201 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2203 private final long electionTimeOutIntervalMillis;
2204 private final int snapshotChunkSize;
2206 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2208 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2209 this.snapshotChunkSize = snapshotChunkSize;
2213 public FiniteDuration getElectionTimeOutInterval() {
2214 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2218 public int getSnapshotChunkSize() {
2219 return snapshotChunkSize;