2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import org.opendaylight.yangtools.concepts.Identifier;
66 import scala.concurrent.duration.FiniteDuration;
68 public class LeaderTest extends AbstractLeaderTest<Leader> {
70 static final String FOLLOWER_ID = "follower";
71 public static final String LEADER_ID = "leader";
73 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79 private Leader leader;
80 private final short payloadVersion = 5;
84 public void tearDown() throws Exception {
93 public void testHandleMessageForUnknownMessage() throws Exception {
94 logStart("testHandleMessageForUnknownMessage");
96 leader = new Leader(createActorContext());
98 // handle message should null when it receives an unknown message
99 assertNull(leader.handleMessage(followerActor, "foo"));
103 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106 MockRaftActorContext actorContext = createActorContextWithFollower();
107 actorContext.setCommitIndex(-1);
108 short payloadVersion = (short)5;
109 actorContext.setPayloadVersion(payloadVersion);
112 actorContext.getTermInformation().update(term, "");
114 leader = new Leader(actorContext);
115 actorContext.setCurrentBehavior(leader);
117 // Leader should send an immediate heartbeat with no entries as follower is inactive.
118 long lastIndex = actorContext.getReplicatedLog().lastIndex();
119 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120 assertEquals("getTerm", term, appendEntries.getTerm());
121 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
122 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
123 assertEquals("Entries size", 0, appendEntries.getEntries().size());
124 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126 // The follower would normally reply - simulate that explicitly here.
127 leader.handleMessage(followerActor, new AppendEntriesReply(
128 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
129 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131 followerActor.underlyingActor().clear();
133 // Sleep for the heartbeat interval so AppendEntries is sent.
134 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
135 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
140 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
141 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
142 assertEquals("Entries size", 1, appendEntries.getEntries().size());
143 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
144 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
145 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
149 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
150 return sendReplicate(actorContext, 1, index);
153 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
154 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156 term, index, payload);
157 actorContext.getReplicatedLog().append(newEntry);
158 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
162 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165 MockRaftActorContext actorContext = createActorContextWithFollower();
168 actorContext.getTermInformation().update(term, "");
170 leader = new Leader(actorContext);
172 // Leader will send an immediate heartbeat - ignore it.
173 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175 // The follower would normally reply - simulate that explicitly here.
176 long lastIndex = actorContext.getReplicatedLog().lastIndex();
177 leader.handleMessage(followerActor, new AppendEntriesReply(
178 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181 followerActor.underlyingActor().clear();
183 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185 // State should not change
186 assertTrue(raftBehavior instanceof Leader);
188 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191 assertEquals("Entries size", 1, appendEntries.getEntries().size());
192 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
199 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202 MockRaftActorContext actorContext = createActorContextWithFollower();
203 actorContext.setCommitIndex(-1);
205 // The raft context is initialized with a couple log entries. However the commitIndex
206 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
207 // committed and applied. Now it regains leadership with a higher term (2).
208 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
209 long newTerm = prevTerm + 1;
210 actorContext.getTermInformation().update(newTerm, "");
212 leader = new Leader(actorContext);
213 actorContext.setCurrentBehavior(leader);
215 // Leader will send an immediate heartbeat - ignore it.
216 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
218 // The follower replies with the leader's current last index and term, simulating that it is
219 // up to date with the leader.
220 long lastIndex = actorContext.getReplicatedLog().lastIndex();
221 leader.handleMessage(followerActor, new AppendEntriesReply(
222 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
224 // The commit index should not get updated even though consensus was reached. This is b/c the
225 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
226 // from previous terms by counting replicas".
227 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
229 followerActor.underlyingActor().clear();
231 // Now replicate a new entry with the new term 2.
232 long newIndex = lastIndex + 1;
233 sendReplicate(actorContext, newTerm, newIndex);
235 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
237 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
238 assertEquals("Entries size", 1, appendEntries.getEntries().size());
239 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
240 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
241 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
243 // The follower replies with success. The leader should now update the commit index to the new index
244 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
245 // prior entries are committed indirectly".
246 leader.handleMessage(followerActor, new AppendEntriesReply(
247 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
249 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
253 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
254 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
256 MockRaftActorContext actorContext = createActorContextWithFollower();
257 actorContext.setRaftPolicy(createRaftPolicy(true, true));
260 actorContext.getTermInformation().update(term, "");
262 leader = new Leader(actorContext);
264 // Leader will send an immediate heartbeat - ignore it.
265 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267 // The follower would normally reply - simulate that explicitly here.
268 long lastIndex = actorContext.getReplicatedLog().lastIndex();
269 leader.handleMessage(followerActor, new AppendEntriesReply(
270 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
271 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
273 followerActor.underlyingActor().clear();
275 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
277 // State should not change
278 assertTrue(raftBehavior instanceof Leader);
280 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
282 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
283 assertEquals("Entries size", 1, appendEntries.getEntries().size());
284 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
285 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
286 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
287 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
291 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
292 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
294 MockRaftActorContext actorContext = createActorContextWithFollower();
295 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
297 public FiniteDuration getHeartBeatInterval() {
298 return FiniteDuration.apply(5, TimeUnit.SECONDS);
303 actorContext.getTermInformation().update(term, "");
305 leader = new Leader(actorContext);
307 // Leader will send an immediate heartbeat - ignore it.
308 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
310 // The follower would normally reply - simulate that explicitly here.
311 long lastIndex = actorContext.getReplicatedLog().lastIndex();
312 leader.handleMessage(followerActor, new AppendEntriesReply(
313 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
314 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
316 followerActor.underlyingActor().clear();
318 for(int i=0;i<5;i++) {
319 sendReplicate(actorContext, lastIndex+i+1);
322 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
323 // We expect only 1 message to be sent because of two reasons,
324 // - an append entries reply was not received
325 // - the heartbeat interval has not expired
326 // In this scenario if multiple messages are sent they would likely be duplicates
327 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
331 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
332 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
334 MockRaftActorContext actorContext = createActorContextWithFollower();
335 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
337 public FiniteDuration getHeartBeatInterval() {
338 return FiniteDuration.apply(5, TimeUnit.SECONDS);
343 actorContext.getTermInformation().update(term, "");
345 leader = new Leader(actorContext);
347 // Leader will send an immediate heartbeat - ignore it.
348 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350 // The follower would normally reply - simulate that explicitly here.
351 long lastIndex = actorContext.getReplicatedLog().lastIndex();
352 leader.handleMessage(followerActor, new AppendEntriesReply(
353 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
354 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
356 followerActor.underlyingActor().clear();
358 for(int i=0;i<3;i++) {
359 sendReplicate(actorContext, lastIndex+i+1);
360 leader.handleMessage(followerActor, new AppendEntriesReply(
361 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
365 for(int i=3;i<5;i++) {
366 sendReplicate(actorContext, lastIndex + i + 1);
369 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
370 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
371 // get sent to the follower - but not the 5th
372 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
374 for(int i=0;i<4;i++) {
375 long expected = allMessages.get(i).getEntries().get(0).getIndex();
376 assertEquals(expected, i+2);
381 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
382 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
384 MockRaftActorContext actorContext = createActorContextWithFollower();
385 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
387 public FiniteDuration getHeartBeatInterval() {
388 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
393 actorContext.getTermInformation().update(term, "");
395 leader = new Leader(actorContext);
397 // Leader will send an immediate heartbeat - ignore it.
398 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
400 // The follower would normally reply - simulate that explicitly here.
401 long lastIndex = actorContext.getReplicatedLog().lastIndex();
402 leader.handleMessage(followerActor, new AppendEntriesReply(
403 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
404 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
406 followerActor.underlyingActor().clear();
408 sendReplicate(actorContext, lastIndex+1);
410 // Wait slightly longer than heartbeat duration
411 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
413 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
415 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
416 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
418 assertEquals(1, allMessages.get(0).getEntries().size());
419 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420 assertEquals(1, allMessages.get(1).getEntries().size());
421 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
426 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
427 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
429 MockRaftActorContext actorContext = createActorContextWithFollower();
430 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
432 public FiniteDuration getHeartBeatInterval() {
433 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
438 actorContext.getTermInformation().update(term, "");
440 leader = new Leader(actorContext);
442 // Leader will send an immediate heartbeat - ignore it.
443 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
445 // The follower would normally reply - simulate that explicitly here.
446 long lastIndex = actorContext.getReplicatedLog().lastIndex();
447 leader.handleMessage(followerActor, new AppendEntriesReply(
448 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
449 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
451 followerActor.underlyingActor().clear();
453 for(int i=0;i<3;i++) {
454 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
455 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
458 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
459 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
463 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
464 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
466 MockRaftActorContext actorContext = createActorContextWithFollower();
467 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469 public FiniteDuration getHeartBeatInterval() {
470 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
475 actorContext.getTermInformation().update(term, "");
477 leader = new Leader(actorContext);
479 // Leader will send an immediate heartbeat - ignore it.
480 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482 // The follower would normally reply - simulate that explicitly here.
483 long lastIndex = actorContext.getReplicatedLog().lastIndex();
484 leader.handleMessage(followerActor, new AppendEntriesReply(
485 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488 followerActor.underlyingActor().clear();
490 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492 sendReplicate(actorContext, lastIndex+1);
494 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
497 assertEquals(0, allMessages.get(0).getEntries().size());
498 assertEquals(1, allMessages.get(1).getEntries().size());
503 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
504 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
506 MockRaftActorContext actorContext = createActorContext();
508 leader = new Leader(actorContext);
510 actorContext.setLastApplied(0);
512 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
513 long term = actorContext.getTermInformation().getCurrentTerm();
514 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
515 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
517 actorContext.getReplicatedLog().append(newEntry);
519 final Identifier id = new MockIdentifier("state-id");
520 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
522 // State should not change
523 assertTrue(raftBehavior instanceof Leader);
525 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
527 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
528 // one since lastApplied state is 0.
529 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
530 leaderActor, ApplyState.class);
531 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
533 for(int i = 0; i <= newLogIndex - 1; i++ ) {
534 ApplyState applyState = applyStateList.get(i);
535 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
536 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
539 ApplyState last = applyStateList.get((int) newLogIndex - 1);
540 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
541 assertEquals("getIdentifier", id, last.getIdentifier());
545 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
546 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
548 MockRaftActorContext actorContext = createActorContextWithFollower();
550 Map<String, String> leadersSnapshot = new HashMap<>();
551 leadersSnapshot.put("1", "A");
552 leadersSnapshot.put("2", "B");
553 leadersSnapshot.put("3", "C");
556 actorContext.getReplicatedLog().removeFrom(0);
558 final int commitIndex = 3;
559 final int snapshotIndex = 2;
560 final int newEntryIndex = 4;
561 final int snapshotTerm = 1;
562 final int currentTerm = 2;
564 // set the snapshot variables in replicatedlog
565 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567 actorContext.setCommitIndex(commitIndex);
568 //set follower timeout to 2 mins, helps during debugging
569 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
571 leader = new Leader(actorContext);
573 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
577 ReplicatedLogImplEntry entry =
578 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579 new MockRaftActorContext.MockPayload("D"));
581 //update follower timestamp
582 leader.markFollowerActive(FOLLOWER_ID);
584 ByteString bs = toByteString(leadersSnapshot);
585 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
586 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
587 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
588 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
589 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
591 //send first chunk and no InstallSnapshotReply received yet
593 fts.incrementChunkIndex();
595 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596 TimeUnit.MILLISECONDS);
598 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
600 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
602 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604 //InstallSnapshotReply received
605 fts.markSendStatus(true);
607 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611 assertEquals(commitIndex, is.getLastIncludedIndex());
615 public void testSendAppendEntriesSnapshotScenario() throws Exception {
616 logStart("testSendAppendEntriesSnapshotScenario");
618 MockRaftActorContext actorContext = createActorContextWithFollower();
620 Map<String, String> leadersSnapshot = new HashMap<>();
621 leadersSnapshot.put("1", "A");
622 leadersSnapshot.put("2", "B");
623 leadersSnapshot.put("3", "C");
626 actorContext.getReplicatedLog().removeFrom(0);
628 final int followersLastIndex = 2;
629 final int snapshotIndex = 3;
630 final int newEntryIndex = 4;
631 final int snapshotTerm = 1;
632 final int currentTerm = 2;
634 // set the snapshot variables in replicatedlog
635 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637 actorContext.setCommitIndex(followersLastIndex);
639 leader = new Leader(actorContext);
641 // Leader will send an immediate heartbeat - ignore it.
642 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
645 ReplicatedLogImplEntry entry =
646 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
647 new MockRaftActorContext.MockPayload("D"));
649 actorContext.getReplicatedLog().append(entry);
651 //update follower timestamp
652 leader.markFollowerActive(FOLLOWER_ID);
654 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655 RaftActorBehavior raftBehavior = leader.handleMessage(
656 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
658 assertTrue(raftBehavior instanceof Leader);
660 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
664 public void testInitiateInstallSnapshot() throws Exception {
665 logStart("testInitiateInstallSnapshot");
667 MockRaftActorContext actorContext = createActorContextWithFollower();
670 actorContext.getReplicatedLog().removeFrom(0);
672 final int followersLastIndex = 2;
673 final int snapshotIndex = 3;
674 final int newEntryIndex = 4;
675 final int snapshotTerm = 1;
676 final int currentTerm = 2;
678 // set the snapshot variables in replicatedlog
679 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681 actorContext.setLastApplied(3);
682 actorContext.setCommitIndex(followersLastIndex);
684 leader = new Leader(actorContext);
686 // Leader will send an immediate heartbeat - ignore it.
687 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689 // set the snapshot as absent and check if capture-snapshot is invoked.
690 leader.setSnapshot(null);
693 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
694 new MockRaftActorContext.MockPayload("D"));
696 actorContext.getReplicatedLog().append(entry);
698 //update follower timestamp
699 leader.markFollowerActive(FOLLOWER_ID);
701 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
703 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707 assertTrue(cs.isInstallSnapshotInitiated());
708 assertEquals(3, cs.getLastAppliedIndex());
709 assertEquals(1, cs.getLastAppliedTerm());
710 assertEquals(4, cs.getLastIndex());
711 assertEquals(2, cs.getLastTerm());
713 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
716 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
720 public void testInitiateForceInstallSnapshot() throws Exception {
721 logStart("testInitiateForceInstallSnapshot");
723 MockRaftActorContext actorContext = createActorContextWithFollower();
725 final int followersLastIndex = 2;
726 final int snapshotIndex = -1;
727 final int newEntryIndex = 4;
728 final int snapshotTerm = -1;
729 final int currentTerm = 2;
731 // set the snapshot variables in replicatedlog
732 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734 actorContext.setLastApplied(3);
735 actorContext.setCommitIndex(followersLastIndex);
737 actorContext.getReplicatedLog().removeFrom(0);
739 leader = new Leader(actorContext);
741 // Leader will send an immediate heartbeat - ignore it.
742 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744 // set the snapshot as absent and check if capture-snapshot is invoked.
745 leader.setSnapshot(null);
747 for(int i=0;i<4;i++) {
748 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
749 new MockRaftActorContext.MockPayload("X" + i)));
753 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
754 new MockRaftActorContext.MockPayload("D"));
756 actorContext.getReplicatedLog().append(entry);
758 //update follower timestamp
759 leader.markFollowerActive(FOLLOWER_ID);
761 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762 // installed with a SendInstallSnapshot
763 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
765 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769 assertTrue(cs.isInstallSnapshotInitiated());
770 assertEquals(3, cs.getLastAppliedIndex());
771 assertEquals(1, cs.getLastAppliedTerm());
772 assertEquals(4, cs.getLastIndex());
773 assertEquals(2, cs.getLastTerm());
775 // if an initiate is started again when first is in progress, it should not initiate Capture
776 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
778 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
783 public void testInstallSnapshot() throws Exception {
784 logStart("testInstallSnapshot");
786 MockRaftActorContext actorContext = createActorContextWithFollower();
788 Map<String, String> leadersSnapshot = new HashMap<>();
789 leadersSnapshot.put("1", "A");
790 leadersSnapshot.put("2", "B");
791 leadersSnapshot.put("3", "C");
794 actorContext.getReplicatedLog().removeFrom(0);
796 final int lastAppliedIndex = 3;
797 final int snapshotIndex = 2;
798 final int snapshotTerm = 1;
799 final int currentTerm = 2;
801 // set the snapshot variables in replicatedlog
802 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805 actorContext.setCommitIndex(lastAppliedIndex);
806 actorContext.setLastApplied(lastAppliedIndex);
808 leader = new Leader(actorContext);
810 // Initial heartbeat.
811 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
813 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
816 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
817 Collections.<ReplicatedLogEntry>emptyList(),
818 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
820 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
822 assertTrue(raftBehavior instanceof Leader);
824 // check if installsnapshot gets called with the correct values.
826 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828 assertNotNull(installSnapshot.getData());
829 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
830 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
832 assertEquals(currentTerm, installSnapshot.getTerm());
836 public void testForceInstallSnapshot() throws Exception {
837 logStart("testForceInstallSnapshot");
839 MockRaftActorContext actorContext = createActorContextWithFollower();
841 Map<String, String> leadersSnapshot = new HashMap<>();
842 leadersSnapshot.put("1", "A");
843 leadersSnapshot.put("2", "B");
844 leadersSnapshot.put("3", "C");
846 final int lastAppliedIndex = 3;
847 final int snapshotIndex = -1;
848 final int snapshotTerm = -1;
849 final int currentTerm = 2;
851 // set the snapshot variables in replicatedlog
852 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
853 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
854 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
855 actorContext.setCommitIndex(lastAppliedIndex);
856 actorContext.setLastApplied(lastAppliedIndex);
858 leader = new Leader(actorContext);
860 // Initial heartbeat.
861 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
864 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
866 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
867 Collections.<ReplicatedLogEntry>emptyList(),
868 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
870 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
872 assertTrue(raftBehavior instanceof Leader);
874 // check if installsnapshot gets called with the correct values.
876 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
878 assertNotNull(installSnapshot.getData());
879 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
880 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
882 assertEquals(currentTerm, installSnapshot.getTerm());
886 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
887 logStart("testHandleInstallSnapshotReplyLastChunk");
889 MockRaftActorContext actorContext = createActorContextWithFollower();
891 final int commitIndex = 3;
892 final int snapshotIndex = 2;
893 final int snapshotTerm = 1;
894 final int currentTerm = 2;
896 actorContext.setCommitIndex(commitIndex);
898 leader = new Leader(actorContext);
899 actorContext.setCurrentBehavior(leader);
901 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
904 // Ignore initial heartbeat.
905 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
907 Map<String, String> leadersSnapshot = new HashMap<>();
908 leadersSnapshot.put("1", "A");
909 leadersSnapshot.put("2", "B");
910 leadersSnapshot.put("3", "C");
912 // set the snapshot variables in replicatedlog
914 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
918 ByteString bs = toByteString(leadersSnapshot);
919 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
920 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
921 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
922 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
923 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
924 while(!fts.isLastChunk(fts.getChunkIndex())) {
926 fts.incrementChunkIndex();
930 actorContext.getReplicatedLog().removeFrom(0);
932 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
935 assertTrue(raftBehavior instanceof Leader);
937 assertEquals(0, leader.followerSnapshotSize());
938 assertEquals(1, leader.followerLogSize());
939 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
941 assertEquals(commitIndex, fli.getMatchIndex());
942 assertEquals(commitIndex + 1, fli.getNextIndex());
946 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
947 logStart("testSendSnapshotfromInstallSnapshotReply");
949 MockRaftActorContext actorContext = createActorContextWithFollower();
951 final int commitIndex = 3;
952 final int snapshotIndex = 2;
953 final int snapshotTerm = 1;
954 final int currentTerm = 2;
956 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
958 public int getSnapshotChunkSize() {
962 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
963 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
965 actorContext.setConfigParams(configParams);
966 actorContext.setCommitIndex(commitIndex);
968 leader = new Leader(actorContext);
969 actorContext.setCurrentBehavior(leader);
971 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
972 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
974 Map<String, String> leadersSnapshot = new HashMap<>();
975 leadersSnapshot.put("1", "A");
976 leadersSnapshot.put("2", "B");
977 leadersSnapshot.put("3", "C");
979 // set the snapshot variables in replicatedlog
980 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
981 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
982 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
984 ByteString bs = toByteString(leadersSnapshot);
985 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
986 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
987 leader.setSnapshot(snapshot);
989 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
991 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
993 assertEquals(1, installSnapshot.getChunkIndex());
994 assertEquals(3, installSnapshot.getTotalChunks());
996 followerActor.underlyingActor().clear();
997 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
998 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1000 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1002 assertEquals(2, installSnapshot.getChunkIndex());
1003 assertEquals(3, installSnapshot.getTotalChunks());
1005 followerActor.underlyingActor().clear();
1006 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1007 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1009 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1011 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1012 followerActor.underlyingActor().clear();
1013 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1016 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1018 assertNull(installSnapshot);
1023 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1024 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1026 MockRaftActorContext actorContext = createActorContextWithFollower();
1028 final int commitIndex = 3;
1029 final int snapshotIndex = 2;
1030 final int snapshotTerm = 1;
1031 final int currentTerm = 2;
1033 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1035 public int getSnapshotChunkSize() {
1040 actorContext.setCommitIndex(commitIndex);
1042 leader = new Leader(actorContext);
1044 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1045 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1047 Map<String, String> leadersSnapshot = new HashMap<>();
1048 leadersSnapshot.put("1", "A");
1049 leadersSnapshot.put("2", "B");
1050 leadersSnapshot.put("3", "C");
1052 // set the snapshot variables in replicatedlog
1053 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1054 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1055 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1057 ByteString bs = toByteString(leadersSnapshot);
1058 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1059 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1060 leader.setSnapshot(snapshot);
1062 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1063 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1065 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1067 assertEquals(1, installSnapshot.getChunkIndex());
1068 assertEquals(3, installSnapshot.getTotalChunks());
1070 followerActor.underlyingActor().clear();
1072 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073 FOLLOWER_ID, -1, false));
1075 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076 TimeUnit.MILLISECONDS);
1078 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1080 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1082 assertEquals(1, installSnapshot.getChunkIndex());
1083 assertEquals(3, installSnapshot.getTotalChunks());
1087 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1088 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1090 MockRaftActorContext actorContext = createActorContextWithFollower();
1092 final int commitIndex = 3;
1093 final int snapshotIndex = 2;
1094 final int snapshotTerm = 1;
1095 final int currentTerm = 2;
1097 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1099 public int getSnapshotChunkSize() {
1104 actorContext.setCommitIndex(commitIndex);
1106 leader = new Leader(actorContext);
1108 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1109 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1111 Map<String, String> leadersSnapshot = new HashMap<>();
1112 leadersSnapshot.put("1", "A");
1113 leadersSnapshot.put("2", "B");
1114 leadersSnapshot.put("3", "C");
1116 // set the snapshot variables in replicatedlog
1117 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1118 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1119 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1121 ByteString bs = toByteString(leadersSnapshot);
1122 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1123 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1124 leader.setSnapshot(snapshot);
1126 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1128 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1130 assertEquals(1, installSnapshot.getChunkIndex());
1131 assertEquals(3, installSnapshot.getTotalChunks());
1132 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1133 installSnapshot.getLastChunkHashCode().get().intValue());
1135 int hashCode = Arrays.hashCode(installSnapshot.getData());
1137 followerActor.underlyingActor().clear();
1139 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1140 FOLLOWER_ID, 1, true));
1142 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1144 assertEquals(2, installSnapshot.getChunkIndex());
1145 assertEquals(3, installSnapshot.getTotalChunks());
1146 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1150 public void testFollowerToSnapshotLogic() {
1151 logStart("testFollowerToSnapshotLogic");
1153 MockRaftActorContext actorContext = createActorContext();
1155 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1157 public int getSnapshotChunkSize() {
1162 leader = new Leader(actorContext);
1164 Map<String, String> leadersSnapshot = new HashMap<>();
1165 leadersSnapshot.put("1", "A");
1166 leadersSnapshot.put("2", "B");
1167 leadersSnapshot.put("3", "C");
1169 ByteString bs = toByteString(leadersSnapshot);
1170 byte[] barray = bs.toByteArray();
1172 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
1173 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
1174 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1176 assertEquals(bs.size(), barray.length);
1179 for (int i=0; i < barray.length; i = i + 50) {
1183 if (i + 50 > barray.length) {
1187 byte[] chunk = fts.getNextChunk();
1188 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1189 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1191 fts.markSendStatus(true);
1192 if (!fts.isLastChunk(chunkIndex)) {
1193 fts.incrementChunkIndex();
1197 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1201 protected Leader createBehavior(final RaftActorContext actorContext) {
1202 return new Leader(actorContext);
1206 protected MockRaftActorContext createActorContext() {
1207 return createActorContext(leaderActor);
1211 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1212 return createActorContext(LEADER_ID, actorRef);
1215 private MockRaftActorContext createActorContextWithFollower() {
1216 MockRaftActorContext actorContext = createActorContext();
1217 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1218 followerActor.path().toString()).build());
1219 return actorContext;
1222 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1223 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1224 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1225 configParams.setElectionTimeoutFactor(100000);
1226 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1227 context.setConfigParams(configParams);
1228 context.setPayloadVersion(payloadVersion);
1232 private MockRaftActorContext createFollowerActorContextWithLeader() {
1233 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1234 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1235 followerConfig.setElectionTimeoutFactor(10000);
1236 followerActorContext.setConfigParams(followerConfig);
1237 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1238 return followerActorContext;
1242 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1243 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1245 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1247 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1249 Follower follower = new Follower(followerActorContext);
1250 followerActor.underlyingActor().setBehavior(follower);
1251 followerActorContext.setCurrentBehavior(follower);
1253 Map<String, String> peerAddresses = new HashMap<>();
1254 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1256 leaderActorContext.setPeerAddresses(peerAddresses);
1258 leaderActorContext.getReplicatedLog().removeFrom(0);
1261 leaderActorContext.setReplicatedLog(
1262 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264 leaderActorContext.setCommitIndex(1);
1266 followerActorContext.getReplicatedLog().removeFrom(0);
1268 // follower too has the exact same log entries and has the same commit index
1269 followerActorContext.setReplicatedLog(
1270 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1272 followerActorContext.setCommitIndex(1);
1274 leader = new Leader(leaderActorContext);
1275 leaderActorContext.setCurrentBehavior(leader);
1277 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1279 assertEquals(1, appendEntries.getLeaderCommit());
1280 assertEquals(0, appendEntries.getEntries().size());
1281 assertEquals(0, appendEntries.getPrevLogIndex());
1283 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1284 leaderActor, AppendEntriesReply.class);
1286 assertEquals(2, appendEntriesReply.getLogLastIndex());
1287 assertEquals(1, appendEntriesReply.getLogLastTerm());
1289 // follower returns its next index
1290 assertEquals(2, appendEntriesReply.getLogLastIndex());
1291 assertEquals(1, appendEntriesReply.getLogLastTerm());
1297 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1298 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1300 MockRaftActorContext leaderActorContext = createActorContext();
1302 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1303 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1305 Follower follower = new Follower(followerActorContext);
1306 followerActor.underlyingActor().setBehavior(follower);
1307 followerActorContext.setCurrentBehavior(follower);
1309 Map<String, String> leaderPeerAddresses = new HashMap<>();
1310 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1312 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1314 leaderActorContext.getReplicatedLog().removeFrom(0);
1316 leaderActorContext.setReplicatedLog(
1317 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1319 leaderActorContext.setCommitIndex(1);
1321 followerActorContext.getReplicatedLog().removeFrom(0);
1323 followerActorContext.setReplicatedLog(
1324 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1326 // follower has the same log entries but its commit index > leaders commit index
1327 followerActorContext.setCommitIndex(2);
1329 leader = new Leader(leaderActorContext);
1331 // Initial heartbeat
1332 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334 assertEquals(1, appendEntries.getLeaderCommit());
1335 assertEquals(0, appendEntries.getEntries().size());
1336 assertEquals(0, appendEntries.getPrevLogIndex());
1338 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1339 leaderActor, AppendEntriesReply.class);
1341 assertEquals(2, appendEntriesReply.getLogLastIndex());
1342 assertEquals(1, appendEntriesReply.getLogLastTerm());
1344 leaderActor.underlyingActor().setBehavior(follower);
1345 leader.handleMessage(followerActor, appendEntriesReply);
1347 leaderActor.underlyingActor().clear();
1348 followerActor.underlyingActor().clear();
1350 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1351 TimeUnit.MILLISECONDS);
1353 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1355 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1357 assertEquals(2, appendEntries.getLeaderCommit());
1358 assertEquals(0, appendEntries.getEntries().size());
1359 assertEquals(2, appendEntries.getPrevLogIndex());
1361 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1363 assertEquals(2, appendEntriesReply.getLogLastIndex());
1364 assertEquals(1, appendEntriesReply.getLogLastTerm());
1366 assertEquals(2, followerActorContext.getCommitIndex());
1372 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1373 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1375 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1376 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1377 new FiniteDuration(1000, TimeUnit.SECONDS));
1379 leaderActorContext.setReplicatedLog(
1380 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1381 long leaderCommitIndex = 2;
1382 leaderActorContext.setCommitIndex(leaderCommitIndex);
1383 leaderActorContext.setLastApplied(leaderCommitIndex);
1385 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1386 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1388 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1390 followerActorContext.setReplicatedLog(
1391 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1392 followerActorContext.setCommitIndex(0);
1393 followerActorContext.setLastApplied(0);
1395 Follower follower = new Follower(followerActorContext);
1396 followerActor.underlyingActor().setBehavior(follower);
1398 leader = new Leader(leaderActorContext);
1400 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1401 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1403 MessageCollectorActor.clearMessages(followerActor);
1404 MessageCollectorActor.clearMessages(leaderActor);
1406 // Verify initial AppendEntries sent with the leader's current commit index.
1407 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1409 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1411 leaderActor.underlyingActor().setBehavior(leader);
1413 leader.handleMessage(followerActor, appendEntriesReply);
1415 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1416 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1418 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1420 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1422 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1423 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1424 appendEntries.getEntries().get(0).getData());
1425 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1426 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1427 appendEntries.getEntries().get(1).getData());
1429 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1430 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1432 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1434 ApplyState applyState = applyStateList.get(0);
1435 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1436 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1438 applyState.getReplicatedLogEntry().getData());
1440 applyState = applyStateList.get(1);
1441 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1442 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1444 applyState.getReplicatedLogEntry().getData());
1446 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1447 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1451 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1452 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1454 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1455 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1456 new FiniteDuration(1000, TimeUnit.SECONDS));
1458 leaderActorContext.setReplicatedLog(
1459 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1460 long leaderCommitIndex = 1;
1461 leaderActorContext.setCommitIndex(leaderCommitIndex);
1462 leaderActorContext.setLastApplied(leaderCommitIndex);
1464 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1465 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1467 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1469 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1470 followerActorContext.setCommitIndex(-1);
1471 followerActorContext.setLastApplied(-1);
1473 Follower follower = new Follower(followerActorContext);
1474 followerActor.underlyingActor().setBehavior(follower);
1475 followerActorContext.setCurrentBehavior(follower);
1477 leader = new Leader(leaderActorContext);
1479 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1480 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1482 MessageCollectorActor.clearMessages(followerActor);
1483 MessageCollectorActor.clearMessages(leaderActor);
1485 // Verify initial AppendEntries sent with the leader's current commit index.
1486 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1487 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1488 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1490 leaderActor.underlyingActor().setBehavior(leader);
1491 leaderActorContext.setCurrentBehavior(leader);
1493 leader.handleMessage(followerActor, appendEntriesReply);
1495 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1496 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1498 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1499 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1500 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1502 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1503 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1504 appendEntries.getEntries().get(0).getData());
1505 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1506 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1507 appendEntries.getEntries().get(1).getData());
1509 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1510 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1512 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1514 ApplyState applyState = applyStateList.get(0);
1515 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1516 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1518 applyState.getReplicatedLogEntry().getData());
1520 applyState = applyStateList.get(1);
1521 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1522 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1523 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1524 applyState.getReplicatedLogEntry().getData());
1526 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1527 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1531 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1532 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1534 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1535 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1536 new FiniteDuration(1000, TimeUnit.SECONDS));
1538 leaderActorContext.setReplicatedLog(
1539 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1540 long leaderCommitIndex = 1;
1541 leaderActorContext.setCommitIndex(leaderCommitIndex);
1542 leaderActorContext.setLastApplied(leaderCommitIndex);
1544 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1545 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1547 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1549 followerActorContext.setReplicatedLog(
1550 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1551 followerActorContext.setCommitIndex(-1);
1552 followerActorContext.setLastApplied(-1);
1554 Follower follower = new Follower(followerActorContext);
1555 followerActor.underlyingActor().setBehavior(follower);
1556 followerActorContext.setCurrentBehavior(follower);
1558 leader = new Leader(leaderActorContext);
1560 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1561 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1563 MessageCollectorActor.clearMessages(followerActor);
1564 MessageCollectorActor.clearMessages(leaderActor);
1566 // Verify initial AppendEntries sent with the leader's current commit index.
1567 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1568 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1569 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1571 leaderActor.underlyingActor().setBehavior(leader);
1572 leaderActorContext.setCurrentBehavior(leader);
1574 leader.handleMessage(followerActor, appendEntriesReply);
1576 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1577 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1579 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1580 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1581 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1583 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1584 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1585 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1586 appendEntries.getEntries().get(0).getData());
1587 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1588 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1589 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1590 appendEntries.getEntries().get(1).getData());
1592 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1593 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1595 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1597 ApplyState applyState = applyStateList.get(0);
1598 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1599 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1601 applyState.getReplicatedLogEntry().getData());
1603 applyState = applyStateList.get(1);
1604 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1605 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1606 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1607 applyState.getReplicatedLogEntry().getData());
1609 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1610 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1611 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1615 public void testHandleAppendEntriesReplyWithNewerTerm(){
1616 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1618 MockRaftActorContext leaderActorContext = createActorContext();
1619 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1620 new FiniteDuration(10000, TimeUnit.SECONDS));
1622 leaderActorContext.setReplicatedLog(
1623 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1625 leader = new Leader(leaderActorContext);
1626 leaderActor.underlyingActor().setBehavior(leader);
1627 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1629 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1631 assertEquals(false, appendEntriesReply.isSuccess());
1632 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1634 MessageCollectorActor.clearMessages(leaderActor);
1638 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1639 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1641 MockRaftActorContext leaderActorContext = createActorContext();
1642 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1643 new FiniteDuration(10000, TimeUnit.SECONDS));
1645 leaderActorContext.setReplicatedLog(
1646 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1647 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1649 leader = new Leader(leaderActorContext);
1650 leaderActor.underlyingActor().setBehavior(leader);
1651 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1653 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1655 assertEquals(false, appendEntriesReply.isSuccess());
1656 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1658 MessageCollectorActor.clearMessages(leaderActor);
1662 public void testHandleAppendEntriesReplySuccess() throws Exception {
1663 logStart("testHandleAppendEntriesReplySuccess");
1665 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1667 leaderActorContext.setReplicatedLog(
1668 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1670 leaderActorContext.setCommitIndex(1);
1671 leaderActorContext.setLastApplied(1);
1672 leaderActorContext.getTermInformation().update(1, "leader");
1674 leader = new Leader(leaderActorContext);
1676 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1678 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1679 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1681 short payloadVersion = 5;
1682 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1684 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1686 assertEquals(RaftState.Leader, raftActorBehavior.state());
1688 assertEquals(2, leaderActorContext.getCommitIndex());
1690 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1691 leaderActor, ApplyJournalEntries.class);
1693 assertEquals(2, leaderActorContext.getLastApplied());
1695 assertEquals(2, applyJournalEntries.getToIndex());
1697 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1700 assertEquals(1,applyStateList.size());
1702 ApplyState applyState = applyStateList.get(0);
1704 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1706 assertEquals(2, followerInfo.getMatchIndex());
1707 assertEquals(3, followerInfo.getNextIndex());
1708 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1709 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1713 public void testHandleAppendEntriesReplyUnknownFollower(){
1714 logStart("testHandleAppendEntriesReplyUnknownFollower");
1716 MockRaftActorContext leaderActorContext = createActorContext();
1718 leader = new Leader(leaderActorContext);
1720 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1722 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1724 assertEquals(RaftState.Leader, raftActorBehavior.state());
1728 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1729 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1731 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1732 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1733 new FiniteDuration(1000, TimeUnit.SECONDS));
1734 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1736 leaderActorContext.setReplicatedLog(
1737 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1738 long leaderCommitIndex = 3;
1739 leaderActorContext.setCommitIndex(leaderCommitIndex);
1740 leaderActorContext.setLastApplied(leaderCommitIndex);
1742 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1743 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1744 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1745 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1747 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1749 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1750 followerActorContext.setCommitIndex(-1);
1751 followerActorContext.setLastApplied(-1);
1753 Follower follower = new Follower(followerActorContext);
1754 followerActor.underlyingActor().setBehavior(follower);
1755 followerActorContext.setCurrentBehavior(follower);
1757 leader = new Leader(leaderActorContext);
1759 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1760 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1762 MessageCollectorActor.clearMessages(followerActor);
1763 MessageCollectorActor.clearMessages(leaderActor);
1765 // Verify initial AppendEntries sent with the leader's current commit index.
1766 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1767 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1768 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1770 leaderActor.underlyingActor().setBehavior(leader);
1771 leaderActorContext.setCurrentBehavior(leader);
1773 leader.handleMessage(followerActor, appendEntriesReply);
1775 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1776 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1778 appendEntries = appendEntriesList.get(0);
1779 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1780 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1781 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1783 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1784 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1785 appendEntries.getEntries().get(0).getData());
1786 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1787 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1788 appendEntries.getEntries().get(1).getData());
1790 appendEntries = appendEntriesList.get(1);
1791 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1792 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1793 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1795 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1796 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1797 appendEntries.getEntries().get(0).getData());
1798 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1799 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1800 appendEntries.getEntries().get(1).getData());
1802 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1803 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1805 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1807 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1808 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1812 public void testHandleRequestVoteReply(){
1813 logStart("testHandleRequestVoteReply");
1815 MockRaftActorContext leaderActorContext = createActorContext();
1817 leader = new Leader(leaderActorContext);
1819 // Should be a no-op.
1820 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1821 new RequestVoteReply(1, true));
1823 assertEquals(RaftState.Leader, raftActorBehavior.state());
1825 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1827 assertEquals(RaftState.Leader, raftActorBehavior.state());
1831 public void testIsolatedLeaderCheckNoFollowers() {
1832 logStart("testIsolatedLeaderCheckNoFollowers");
1834 MockRaftActorContext leaderActorContext = createActorContext();
1836 leader = new Leader(leaderActorContext);
1837 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1838 assertTrue(behavior instanceof Leader);
1842 public void testIsolatedLeaderCheckNoVotingFollowers() {
1843 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1845 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1846 Follower follower = new Follower(followerActorContext);
1847 followerActor.underlyingActor().setBehavior(follower);
1849 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1850 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1851 new FiniteDuration(1000, TimeUnit.SECONDS));
1852 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1854 leader = new Leader(leaderActorContext);
1855 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1856 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1857 assertTrue("Expected Leader", behavior instanceof Leader);
1860 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1861 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1862 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1864 MockRaftActorContext leaderActorContext = createActorContext();
1866 Map<String, String> peerAddresses = new HashMap<>();
1867 peerAddresses.put("follower-1", followerActor1.path().toString());
1868 peerAddresses.put("follower-2", followerActor2.path().toString());
1870 leaderActorContext.setPeerAddresses(peerAddresses);
1871 leaderActorContext.setRaftPolicy(raftPolicy);
1873 leader = new Leader(leaderActorContext);
1875 leader.markFollowerActive("follower-1");
1876 leader.markFollowerActive("follower-2");
1877 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1878 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1880 // kill 1 follower and verify if that got killed
1881 final JavaTestKit probe = new JavaTestKit(getSystem());
1882 probe.watch(followerActor1);
1883 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1884 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1885 assertEquals(termMsg1.getActor(), followerActor1);
1887 leader.markFollowerInActive("follower-1");
1888 leader.markFollowerActive("follower-2");
1889 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1890 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1892 // kill 2nd follower and leader should change to Isolated leader
1893 followerActor2.tell(PoisonPill.getInstance(), null);
1894 probe.watch(followerActor2);
1895 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1896 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1897 assertEquals(termMsg2.getActor(), followerActor2);
1899 leader.markFollowerInActive("follower-2");
1900 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1904 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1905 logStart("testIsolatedLeaderCheckTwoFollowers");
1907 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1909 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1910 behavior instanceof IsolatedLeader);
1914 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1915 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1917 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1919 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1920 behavior instanceof Leader);
1924 public void testLaggingFollowerStarvation() throws Exception {
1925 logStart("testLaggingFollowerStarvation");
1926 new JavaTestKit(getSystem()) {{
1927 String leaderActorId = actorFactory.generateActorId("leader");
1928 String follower1ActorId = actorFactory.generateActorId("follower");
1929 String follower2ActorId = actorFactory.generateActorId("follower");
1931 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1932 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1933 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1934 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1936 MockRaftActorContext leaderActorContext =
1937 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1939 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1940 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1941 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1943 leaderActorContext.setConfigParams(configParams);
1945 leaderActorContext.setReplicatedLog(
1946 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1948 Map<String, String> peerAddresses = new HashMap<>();
1949 peerAddresses.put(follower1ActorId,
1950 follower1Actor.path().toString());
1951 peerAddresses.put(follower2ActorId,
1952 follower2Actor.path().toString());
1954 leaderActorContext.setPeerAddresses(peerAddresses);
1955 leaderActorContext.getTermInformation().update(1, leaderActorId);
1957 RaftActorBehavior leader = createBehavior(leaderActorContext);
1959 leaderActor.underlyingActor().setBehavior(leader);
1961 for(int i=1;i<6;i++) {
1962 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1963 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1964 assertTrue(newBehavior == leader);
1965 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1968 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1969 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1971 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1972 heartbeats.size() > 1);
1974 // Check if follower-2 got AppendEntries during this time and was not starved
1975 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1977 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1978 appendEntries.size() > 1);
1984 public void testReplicationConsensusWithNonVotingFollower() {
1985 logStart("testReplicationConsensusWithNonVotingFollower");
1987 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1988 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1989 new FiniteDuration(1000, TimeUnit.SECONDS));
1991 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1992 leaderActorContext.setCommitIndex(-1);
1994 String nonVotingFollowerId = "nonvoting-follower";
1995 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1996 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1998 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
2000 leader = new Leader(leaderActorContext);
2001 leaderActorContext.setCurrentBehavior(leader);
2003 // Ignore initial heartbeats
2004 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2005 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2007 MessageCollectorActor.clearMessages(followerActor);
2008 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2009 MessageCollectorActor.clearMessages(leaderActor);
2011 // Send a Replicate message and wait for AppendEntries.
2012 sendReplicate(leaderActorContext, 0);
2014 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2015 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2017 // Send reply only from the voting follower and verify consensus via ApplyState.
2018 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2020 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2022 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2024 MessageCollectorActor.clearMessages(followerActor);
2025 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2026 MessageCollectorActor.clearMessages(leaderActor);
2028 // Send another Replicate message
2029 sendReplicate(leaderActorContext, 1);
2031 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2032 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2033 AppendEntries.class);
2034 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2035 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2037 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2038 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2040 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2042 // Send reply from the voting follower and verify consensus.
2043 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2045 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2049 public void testTransferLeadershipWithFollowerInSync() {
2050 logStart("testTransferLeadershipWithFollowerInSync");
2052 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2053 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2054 new FiniteDuration(1000, TimeUnit.SECONDS));
2055 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2057 leader = new Leader(leaderActorContext);
2058 leaderActorContext.setCurrentBehavior(leader);
2060 // Initial heartbeat
2061 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2062 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2063 MessageCollectorActor.clearMessages(followerActor);
2065 sendReplicate(leaderActorContext, 0);
2066 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2069 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2070 MessageCollectorActor.clearMessages(followerActor);
2072 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2073 leader.transferLeadership(mockTransferCohort);
2075 verify(mockTransferCohort, never()).transferComplete();
2076 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2077 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2079 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2080 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2082 // Leader should force an election timeout
2083 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2085 verify(mockTransferCohort).transferComplete();
2089 public void testTransferLeadershipWithEmptyLog() {
2090 logStart("testTransferLeadershipWithEmptyLog");
2092 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2093 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2094 new FiniteDuration(1000, TimeUnit.SECONDS));
2095 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2097 leader = new Leader(leaderActorContext);
2098 leaderActorContext.setCurrentBehavior(leader);
2100 // Initial heartbeat
2101 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2102 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2103 MessageCollectorActor.clearMessages(followerActor);
2105 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2106 leader.transferLeadership(mockTransferCohort);
2108 verify(mockTransferCohort, never()).transferComplete();
2109 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2112 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2113 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2115 // Leader should force an election timeout
2116 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2118 verify(mockTransferCohort).transferComplete();
2122 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2123 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2125 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2129 leader = new Leader(leaderActorContext);
2130 leaderActorContext.setCurrentBehavior(leader);
2132 // Initial heartbeat
2133 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 MessageCollectorActor.clearMessages(followerActor);
2136 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2137 leader.transferLeadership(mockTransferCohort);
2139 verify(mockTransferCohort, never()).transferComplete();
2141 // Sync up the follower.
2142 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2143 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2144 MessageCollectorActor.clearMessages(followerActor);
2146 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2147 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2148 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2149 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2150 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2152 // Leader should force an election timeout
2153 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2155 verify(mockTransferCohort).transferComplete();
2159 public void testTransferLeadershipWithFollowerSyncTimeout() {
2160 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2162 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2163 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2164 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2165 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2166 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2168 leader = new Leader(leaderActorContext);
2169 leaderActorContext.setCurrentBehavior(leader);
2171 // Initial heartbeat
2172 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2173 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2174 MessageCollectorActor.clearMessages(followerActor);
2176 sendReplicate(leaderActorContext, 0);
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2179 MessageCollectorActor.clearMessages(followerActor);
2181 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2182 leader.transferLeadership(mockTransferCohort);
2184 verify(mockTransferCohort, never()).transferComplete();
2186 // Send heartbeats to time out the transfer.
2187 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2188 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2189 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2190 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2193 verify(mockTransferCohort).abortTransfer();
2194 verify(mockTransferCohort, never()).transferComplete();
2195 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2199 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2200 ActorRef actorRef, RaftRPC rpc) throws Exception {
2201 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2202 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2205 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2207 private final long electionTimeOutIntervalMillis;
2208 private final int snapshotChunkSize;
2210 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2212 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2213 this.snapshotChunkSize = snapshotChunkSize;
2217 public FiniteDuration getElectionTimeOutInterval() {
2218 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2222 public int getSnapshotChunkSize() {
2223 return snapshotChunkSize;