2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
39 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorContext;
42 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
43 import org.opendaylight.controller.cluster.raft.RaftState;
44 import org.opendaylight.controller.cluster.raft.RaftVersions;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
63 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
65 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.yangtools.concepts.Identifier;
68 import scala.concurrent.duration.FiniteDuration;
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
72 static final String FOLLOWER_ID = "follower";
73 public static final String LEADER_ID = "leader";
75 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
78 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
81 private Leader leader;
82 private final short payloadVersion = 5;
86 public void tearDown() throws Exception {
95 public void testHandleMessageForUnknownMessage() throws Exception {
96 logStart("testHandleMessageForUnknownMessage");
98 leader = new Leader(createActorContext());
100 // handle message should null when it receives an unknown message
101 assertNull(leader.handleMessage(followerActor, "foo"));
105 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
108 MockRaftActorContext actorContext = createActorContextWithFollower();
109 actorContext.setCommitIndex(-1);
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
116 actorContext.setCurrentBehavior(leader);
118 // Leader should send an immediate heartbeat with no entries as follower is inactive.
119 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
120 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getTerm", term, appendEntries.getTerm());
122 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 0, appendEntries.getEntries().size());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127 // The follower would normally reply - simulate that explicitly here.
128 leader.handleMessage(followerActor, new AppendEntriesReply(
129 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
132 followerActor.underlyingActor().clear();
134 // Sleep for the heartbeat interval so AppendEntries is sent.
135 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
136 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
138 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
140 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143 assertEquals("Entries size", 1, appendEntries.getEntries().size());
144 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
151 return sendReplicate(actorContext, 1, index);
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
155 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
157 actorContext.getReplicatedLog().append(newEntry);
158 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
162 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165 MockRaftActorContext actorContext = createActorContextWithFollower();
168 actorContext.getTermInformation().update(term, "");
170 leader = new Leader(actorContext);
172 // Leader will send an immediate heartbeat - ignore it.
173 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175 // The follower would normally reply - simulate that explicitly here.
176 long lastIndex = actorContext.getReplicatedLog().lastIndex();
177 leader.handleMessage(followerActor, new AppendEntriesReply(
178 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181 followerActor.underlyingActor().clear();
183 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185 // State should not change
186 assertTrue(raftBehavior instanceof Leader);
188 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191 assertEquals("Entries size", 1, appendEntries.getEntries().size());
192 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
199 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202 MockRaftActorContext actorContext = createActorContextWithFollower();
203 actorContext.setCommitIndex(-1);
204 actorContext.setLastApplied(-1);
206 // The raft context is initialized with a couple log entries. However the commitIndex
207 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208 // committed and applied. Now it regains leadership with a higher term (2).
209 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210 long newTerm = prevTerm + 1;
211 actorContext.getTermInformation().update(newTerm, "");
213 leader = new Leader(actorContext);
214 actorContext.setCurrentBehavior(leader);
216 // Leader will send an immediate heartbeat - ignore it.
217 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
219 // The follower replies with the leader's current last index and term, simulating that it is
220 // up to date with the leader.
221 long lastIndex = actorContext.getReplicatedLog().lastIndex();
222 leader.handleMessage(followerActor, new AppendEntriesReply(
223 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
225 // The commit index should not get updated even though consensus was reached. This is b/c the
226 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
227 // from previous terms by counting replicas".
228 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
230 followerActor.underlyingActor().clear();
232 // Now replicate a new entry with the new term 2.
233 long newIndex = lastIndex + 1;
234 sendReplicate(actorContext, newTerm, newIndex);
236 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239 assertEquals("Entries size", 1, appendEntries.getEntries().size());
240 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
244 // The follower replies with success. The leader should now update the commit index to the new index
245 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
246 // prior entries are committed indirectly".
247 leader.handleMessage(followerActor, new AppendEntriesReply(
248 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
250 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
254 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
257 MockRaftActorContext actorContext = createActorContextWithFollower();
258 actorContext.setRaftPolicy(createRaftPolicy(true, true));
261 actorContext.getTermInformation().update(term, "");
263 leader = new Leader(actorContext);
265 // Leader will send an immediate heartbeat - ignore it.
266 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
268 // The follower would normally reply - simulate that explicitly here.
269 long lastIndex = actorContext.getReplicatedLog().lastIndex();
270 leader.handleMessage(followerActor, new AppendEntriesReply(
271 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
274 followerActor.underlyingActor().clear();
276 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
278 // State should not change
279 assertTrue(raftBehavior instanceof Leader);
281 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284 assertEquals("Entries size", 1, appendEntries.getEntries().size());
285 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
292 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
295 MockRaftActorContext actorContext = createActorContextWithFollower();
296 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
298 public FiniteDuration getHeartBeatInterval() {
299 return FiniteDuration.apply(5, TimeUnit.SECONDS);
304 actorContext.getTermInformation().update(term, "");
306 leader = new Leader(actorContext);
308 // Leader will send an immediate heartbeat - ignore it.
309 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
311 // The follower would normally reply - simulate that explicitly here.
312 long lastIndex = actorContext.getReplicatedLog().lastIndex();
313 leader.handleMessage(followerActor, new AppendEntriesReply(
314 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
317 followerActor.underlyingActor().clear();
319 for (int i = 0; i < 5; i++) {
320 sendReplicate(actorContext, lastIndex + i + 1);
323 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324 // We expect only 1 message to be sent because of two reasons,
325 // - an append entries reply was not received
326 // - the heartbeat interval has not expired
327 // In this scenario if multiple messages are sent they would likely be duplicates
328 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
332 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
335 MockRaftActorContext actorContext = createActorContextWithFollower();
336 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
338 public FiniteDuration getHeartBeatInterval() {
339 return FiniteDuration.apply(5, TimeUnit.SECONDS);
344 actorContext.getTermInformation().update(term, "");
346 leader = new Leader(actorContext);
348 // Leader will send an immediate heartbeat - ignore it.
349 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
351 // The follower would normally reply - simulate that explicitly here.
352 long lastIndex = actorContext.getReplicatedLog().lastIndex();
353 leader.handleMessage(followerActor, new AppendEntriesReply(
354 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
357 followerActor.underlyingActor().clear();
359 for (int i = 0; i < 3; i++) {
360 sendReplicate(actorContext, lastIndex + i + 1);
361 leader.handleMessage(followerActor, new AppendEntriesReply(
362 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
366 for (int i = 3; i < 5; i++) {
367 sendReplicate(actorContext, lastIndex + i + 1);
370 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372 // get sent to the follower - but not the 5th
373 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
375 for (int i = 0; i < 4; i++) {
376 long expected = allMessages.get(i).getEntries().get(0).getIndex();
377 assertEquals(expected, i + 2);
382 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
385 MockRaftActorContext actorContext = createActorContextWithFollower();
386 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
388 public FiniteDuration getHeartBeatInterval() {
389 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
394 actorContext.getTermInformation().update(term, "");
396 leader = new Leader(actorContext);
398 // Leader will send an immediate heartbeat - ignore it.
399 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
401 // The follower would normally reply - simulate that explicitly here.
402 long lastIndex = actorContext.getReplicatedLog().lastIndex();
403 leader.handleMessage(followerActor, new AppendEntriesReply(
404 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
407 followerActor.underlyingActor().clear();
409 sendReplicate(actorContext, lastIndex + 1);
411 // Wait slightly longer than heartbeat duration
412 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
414 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
416 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
419 assertEquals(1, allMessages.get(0).getEntries().size());
420 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
421 assertEquals(1, allMessages.get(1).getEntries().size());
422 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
427 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
430 MockRaftActorContext actorContext = createActorContextWithFollower();
431 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
433 public FiniteDuration getHeartBeatInterval() {
434 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
439 actorContext.getTermInformation().update(term, "");
441 leader = new Leader(actorContext);
443 // Leader will send an immediate heartbeat - ignore it.
444 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446 // The follower would normally reply - simulate that explicitly here.
447 long lastIndex = actorContext.getReplicatedLog().lastIndex();
448 leader.handleMessage(followerActor, new AppendEntriesReply(
449 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
452 followerActor.underlyingActor().clear();
454 for (int i = 0; i < 3; i++) {
455 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
459 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
464 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
467 MockRaftActorContext actorContext = createActorContextWithFollower();
468 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470 public FiniteDuration getHeartBeatInterval() {
471 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
476 actorContext.getTermInformation().update(term, "");
478 leader = new Leader(actorContext);
480 // Leader will send an immediate heartbeat - ignore it.
481 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483 // The follower would normally reply - simulate that explicitly here.
484 long lastIndex = actorContext.getReplicatedLog().lastIndex();
485 leader.handleMessage(followerActor, new AppendEntriesReply(
486 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489 followerActor.underlyingActor().clear();
491 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493 sendReplicate(actorContext, lastIndex + 1);
495 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
498 assertEquals(0, allMessages.get(0).getEntries().size());
499 assertEquals(1, allMessages.get(1).getEntries().size());
504 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
507 MockRaftActorContext actorContext = createActorContext();
509 leader = new Leader(actorContext);
511 actorContext.setLastApplied(0);
513 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514 long term = actorContext.getTermInformation().getCurrentTerm();
515 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
516 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
518 actorContext.getReplicatedLog().append(newEntry);
520 final Identifier id = new MockIdentifier("state-id");
521 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
522 new Replicate(leaderActor, id, newEntry, true));
524 // State should not change
525 assertTrue(raftBehavior instanceof Leader);
527 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
529 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
530 // one since lastApplied state is 0.
531 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
532 leaderActor, ApplyState.class);
533 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
535 for (int i = 0; i <= newLogIndex - 1; i++ ) {
536 ApplyState applyState = applyStateList.get(i);
537 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
538 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
541 ApplyState last = applyStateList.get((int) newLogIndex - 1);
542 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
543 assertEquals("getIdentifier", id, last.getIdentifier());
547 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
548 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
550 final MockRaftActorContext actorContext = createActorContextWithFollower();
552 Map<String, String> leadersSnapshot = new HashMap<>();
553 leadersSnapshot.put("1", "A");
554 leadersSnapshot.put("2", "B");
555 leadersSnapshot.put("3", "C");
558 actorContext.getReplicatedLog().removeFrom(0);
560 final int commitIndex = 3;
561 final int snapshotIndex = 2;
562 final int snapshotTerm = 1;
564 // set the snapshot variables in replicatedlog
565 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567 actorContext.setCommitIndex(commitIndex);
568 //set follower timeout to 2 mins, helps during debugging
569 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
571 leader = new Leader(actorContext);
573 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
576 //update follower timestamp
577 leader.markFollowerActive(FOLLOWER_ID);
579 ByteString bs = toByteString(leadersSnapshot);
580 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
581 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
582 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
583 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
584 fts.setSnapshotBytes(bs);
585 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
587 //send first chunk and no InstallSnapshotReply received yet
589 fts.incrementChunkIndex();
591 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
592 TimeUnit.MILLISECONDS);
594 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
596 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
598 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
600 //InstallSnapshotReply received
601 fts.markSendStatus(true);
603 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
605 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
607 assertEquals(commitIndex, is.getLastIncludedIndex());
611 public void testSendAppendEntriesSnapshotScenario() throws Exception {
612 logStart("testSendAppendEntriesSnapshotScenario");
614 final MockRaftActorContext actorContext = createActorContextWithFollower();
616 Map<String, String> leadersSnapshot = new HashMap<>();
617 leadersSnapshot.put("1", "A");
618 leadersSnapshot.put("2", "B");
619 leadersSnapshot.put("3", "C");
622 actorContext.getReplicatedLog().removeFrom(0);
624 final int followersLastIndex = 2;
625 final int snapshotIndex = 3;
626 final int newEntryIndex = 4;
627 final int snapshotTerm = 1;
628 final int currentTerm = 2;
630 // set the snapshot variables in replicatedlog
631 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
632 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
633 actorContext.setCommitIndex(followersLastIndex);
635 leader = new Leader(actorContext);
637 // Leader will send an immediate heartbeat - ignore it.
638 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
641 SimpleReplicatedLogEntry entry =
642 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
643 new MockRaftActorContext.MockPayload("D"));
645 actorContext.getReplicatedLog().append(entry);
647 //update follower timestamp
648 leader.markFollowerActive(FOLLOWER_ID);
650 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
651 RaftActorBehavior raftBehavior = leader.handleMessage(
652 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
654 assertTrue(raftBehavior instanceof Leader);
656 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
660 public void testInitiateInstallSnapshot() throws Exception {
661 logStart("testInitiateInstallSnapshot");
663 MockRaftActorContext actorContext = createActorContextWithFollower();
666 actorContext.getReplicatedLog().removeFrom(0);
668 final int followersLastIndex = 2;
669 final int snapshotIndex = 3;
670 final int newEntryIndex = 4;
671 final int snapshotTerm = 1;
672 final int currentTerm = 2;
674 // set the snapshot variables in replicatedlog
675 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677 actorContext.setLastApplied(3);
678 actorContext.setCommitIndex(followersLastIndex);
680 leader = new Leader(actorContext);
682 // Leader will send an immediate heartbeat - ignore it.
683 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
685 // set the snapshot as absent and check if capture-snapshot is invoked.
686 leader.setSnapshot(null);
689 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
690 new MockRaftActorContext.MockPayload("D"));
692 actorContext.getReplicatedLog().append(entry);
694 //update follower timestamp
695 leader.markFollowerActive(FOLLOWER_ID);
697 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
699 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
701 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
703 assertTrue(cs.isInstallSnapshotInitiated());
704 assertEquals(3, cs.getLastAppliedIndex());
705 assertEquals(1, cs.getLastAppliedTerm());
706 assertEquals(4, cs.getLastIndex());
707 assertEquals(2, cs.getLastTerm());
709 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
710 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
712 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
716 public void testInitiateForceInstallSnapshot() throws Exception {
717 logStart("testInitiateForceInstallSnapshot");
719 MockRaftActorContext actorContext = createActorContextWithFollower();
721 final int followersLastIndex = 2;
722 final int snapshotIndex = -1;
723 final int newEntryIndex = 4;
724 final int snapshotTerm = -1;
725 final int currentTerm = 2;
727 // set the snapshot variables in replicatedlog
728 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
729 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
730 actorContext.setLastApplied(3);
731 actorContext.setCommitIndex(followersLastIndex);
733 actorContext.getReplicatedLog().removeFrom(0);
735 leader = new Leader(actorContext);
736 actorContext.setCurrentBehavior(leader);
738 // Leader will send an immediate heartbeat - ignore it.
739 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
741 // set the snapshot as absent and check if capture-snapshot is invoked.
742 leader.setSnapshot(null);
744 for (int i = 0; i < 4; i++) {
745 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
746 new MockRaftActorContext.MockPayload("X" + i)));
750 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
751 new MockRaftActorContext.MockPayload("D"));
753 actorContext.getReplicatedLog().append(entry);
755 //update follower timestamp
756 leader.markFollowerActive(FOLLOWER_ID);
758 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
759 // installed with a SendInstallSnapshot
760 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
762 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
764 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
766 assertTrue(cs.isInstallSnapshotInitiated());
767 assertEquals(3, cs.getLastAppliedIndex());
768 assertEquals(1, cs.getLastAppliedTerm());
769 assertEquals(4, cs.getLastIndex());
770 assertEquals(2, cs.getLastTerm());
772 // if an initiate is started again when first is in progress, it should not initiate Capture
773 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
775 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
780 public void testInstallSnapshot() throws Exception {
781 logStart("testInstallSnapshot");
783 final MockRaftActorContext actorContext = createActorContextWithFollower();
785 Map<String, String> leadersSnapshot = new HashMap<>();
786 leadersSnapshot.put("1", "A");
787 leadersSnapshot.put("2", "B");
788 leadersSnapshot.put("3", "C");
791 actorContext.getReplicatedLog().removeFrom(0);
793 final int lastAppliedIndex = 3;
794 final int snapshotIndex = 2;
795 final int snapshotTerm = 1;
796 final int currentTerm = 2;
798 // set the snapshot variables in replicatedlog
799 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
800 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
801 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802 actorContext.setCommitIndex(lastAppliedIndex);
803 actorContext.setLastApplied(lastAppliedIndex);
805 leader = new Leader(actorContext);
807 // Initial heartbeat.
808 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
810 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
811 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
813 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
814 Collections.<ReplicatedLogEntry>emptyList(),
815 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
817 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
819 assertTrue(raftBehavior instanceof Leader);
821 // check if installsnapshot gets called with the correct values.
823 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
824 InstallSnapshot.class);
826 assertNotNull(installSnapshot.getData());
827 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
828 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
830 assertEquals(currentTerm, installSnapshot.getTerm());
834 public void testForceInstallSnapshot() throws Exception {
835 logStart("testForceInstallSnapshot");
837 final MockRaftActorContext actorContext = createActorContextWithFollower();
839 Map<String, String> leadersSnapshot = new HashMap<>();
840 leadersSnapshot.put("1", "A");
841 leadersSnapshot.put("2", "B");
842 leadersSnapshot.put("3", "C");
844 final int lastAppliedIndex = 3;
845 final int snapshotIndex = -1;
846 final int snapshotTerm = -1;
847 final int currentTerm = 2;
849 // set the snapshot variables in replicatedlog
850 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
851 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
852 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
853 actorContext.setCommitIndex(lastAppliedIndex);
854 actorContext.setLastApplied(lastAppliedIndex);
856 leader = new Leader(actorContext);
858 // Initial heartbeat.
859 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
861 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
862 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
864 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
865 Collections.<ReplicatedLogEntry>emptyList(),
866 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
868 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
870 assertTrue(raftBehavior instanceof Leader);
872 // check if installsnapshot gets called with the correct values.
874 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
875 InstallSnapshot.class);
877 assertNotNull(installSnapshot.getData());
878 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
881 assertEquals(currentTerm, installSnapshot.getTerm());
885 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886 logStart("testHandleInstallSnapshotReplyLastChunk");
888 MockRaftActorContext actorContext = createActorContextWithFollower();
890 final int commitIndex = 3;
891 final int snapshotIndex = 2;
892 final int snapshotTerm = 1;
893 final int currentTerm = 2;
895 actorContext.setCommitIndex(commitIndex);
897 leader = new Leader(actorContext);
898 actorContext.setCurrentBehavior(leader);
900 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
901 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
903 // Ignore initial heartbeat.
904 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
906 Map<String, String> leadersSnapshot = new HashMap<>();
907 leadersSnapshot.put("1", "A");
908 leadersSnapshot.put("2", "B");
909 leadersSnapshot.put("3", "C");
911 // set the snapshot variables in replicatedlog
913 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917 ByteString bs = toByteString(leadersSnapshot);
918 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
920 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
921 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
922 fts.setSnapshotBytes(bs);
923 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
924 while (!fts.isLastChunk(fts.getChunkIndex())) {
926 fts.incrementChunkIndex();
930 actorContext.getReplicatedLog().removeFrom(0);
932 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
935 assertTrue(raftBehavior instanceof Leader);
937 assertEquals(1, leader.followerLogSize());
938 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
940 assertNull(fli.getInstallSnapshotState());
941 assertEquals(commitIndex, fli.getMatchIndex());
942 assertEquals(commitIndex + 1, fli.getNextIndex());
943 assertFalse(leader.hasSnapshot());
947 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
948 logStart("testSendSnapshotfromInstallSnapshotReply");
950 MockRaftActorContext actorContext = createActorContextWithFollower();
952 final int commitIndex = 3;
953 final int snapshotIndex = 2;
954 final int snapshotTerm = 1;
955 final int currentTerm = 2;
957 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
959 public int getSnapshotChunkSize() {
963 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
964 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
966 actorContext.setConfigParams(configParams);
967 actorContext.setCommitIndex(commitIndex);
969 leader = new Leader(actorContext);
970 actorContext.setCurrentBehavior(leader);
972 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
973 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
975 Map<String, String> leadersSnapshot = new HashMap<>();
976 leadersSnapshot.put("1", "A");
977 leadersSnapshot.put("2", "B");
978 leadersSnapshot.put("3", "C");
980 // set the snapshot variables in replicatedlog
981 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
982 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
983 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
985 ByteString bs = toByteString(leadersSnapshot);
986 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
987 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
988 leader.setSnapshot(snapshot);
990 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
992 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
993 InstallSnapshot.class);
995 assertEquals(1, installSnapshot.getChunkIndex());
996 assertEquals(3, installSnapshot.getTotalChunks());
998 followerActor.underlyingActor().clear();
999 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1000 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1002 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1004 assertEquals(2, installSnapshot.getChunkIndex());
1005 assertEquals(3, installSnapshot.getTotalChunks());
1007 followerActor.underlyingActor().clear();
1008 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1011 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1013 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1014 followerActor.underlyingActor().clear();
1015 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1016 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1018 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1020 assertNull(installSnapshot);
1025 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1026 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1028 MockRaftActorContext actorContext = createActorContextWithFollower();
1030 final int commitIndex = 3;
1031 final int snapshotIndex = 2;
1032 final int snapshotTerm = 1;
1033 final int currentTerm = 2;
1035 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1037 public int getSnapshotChunkSize() {
1042 actorContext.setCommitIndex(commitIndex);
1044 leader = new Leader(actorContext);
1046 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1049 Map<String, String> leadersSnapshot = new HashMap<>();
1050 leadersSnapshot.put("1", "A");
1051 leadersSnapshot.put("2", "B");
1052 leadersSnapshot.put("3", "C");
1054 // set the snapshot variables in replicatedlog
1055 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1059 ByteString bs = toByteString(leadersSnapshot);
1060 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062 leader.setSnapshot(snapshot);
1064 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1065 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1067 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1068 InstallSnapshot.class);
1070 assertEquals(1, installSnapshot.getChunkIndex());
1071 assertEquals(3, installSnapshot.getTotalChunks());
1073 followerActor.underlyingActor().clear();
1075 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1076 FOLLOWER_ID, -1, false));
1078 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1079 TimeUnit.MILLISECONDS);
1081 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1083 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1085 assertEquals(1, installSnapshot.getChunkIndex());
1086 assertEquals(3, installSnapshot.getTotalChunks());
1090 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1091 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1093 MockRaftActorContext actorContext = createActorContextWithFollower();
1095 final int commitIndex = 3;
1096 final int snapshotIndex = 2;
1097 final int snapshotTerm = 1;
1098 final int currentTerm = 2;
1100 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1102 public int getSnapshotChunkSize() {
1107 actorContext.setCommitIndex(commitIndex);
1109 leader = new Leader(actorContext);
1111 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1114 Map<String, String> leadersSnapshot = new HashMap<>();
1115 leadersSnapshot.put("1", "A");
1116 leadersSnapshot.put("2", "B");
1117 leadersSnapshot.put("3", "C");
1119 // set the snapshot variables in replicatedlog
1120 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1124 ByteString bs = toByteString(leadersSnapshot);
1125 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1126 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1127 leader.setSnapshot(snapshot);
1129 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1131 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1132 InstallSnapshot.class);
1134 assertEquals(1, installSnapshot.getChunkIndex());
1135 assertEquals(3, installSnapshot.getTotalChunks());
1136 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1137 installSnapshot.getLastChunkHashCode().get().intValue());
1139 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1141 followerActor.underlyingActor().clear();
1143 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1144 FOLLOWER_ID, 1, true));
1146 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1148 assertEquals(2, installSnapshot.getChunkIndex());
1149 assertEquals(3, installSnapshot.getTotalChunks());
1150 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1154 public void testLeaderInstallSnapshotState() {
1155 logStart("testLeaderInstallSnapshotState");
1157 Map<String, String> leadersSnapshot = new HashMap<>();
1158 leadersSnapshot.put("1", "A");
1159 leadersSnapshot.put("2", "B");
1160 leadersSnapshot.put("3", "C");
1162 ByteString bs = toByteString(leadersSnapshot);
1163 byte[] barray = bs.toByteArray();
1165 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1166 fts.setSnapshotBytes(bs);
1168 assertEquals(bs.size(), barray.length);
1171 for (int i = 0; i < barray.length; i = i + 50) {
1172 int length = i + 50;
1175 if (i + 50 > barray.length) {
1176 length = barray.length;
1179 byte[] chunk = fts.getNextChunk();
1180 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1181 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1183 fts.markSendStatus(true);
1184 if (!fts.isLastChunk(chunkIndex)) {
1185 fts.incrementChunkIndex();
1189 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1193 protected Leader createBehavior(final RaftActorContext actorContext) {
1194 return new Leader(actorContext);
1198 protected MockRaftActorContext createActorContext() {
1199 return createActorContext(leaderActor);
1203 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1204 return createActorContext(LEADER_ID, actorRef);
1207 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1208 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1209 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1210 configParams.setElectionTimeoutFactor(100000);
1211 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1212 context.setConfigParams(configParams);
1213 context.setPayloadVersion(payloadVersion);
1217 private MockRaftActorContext createActorContextWithFollower() {
1218 MockRaftActorContext actorContext = createActorContext();
1219 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1220 followerActor.path().toString()).build());
1221 return actorContext;
1224 private MockRaftActorContext createFollowerActorContextWithLeader() {
1225 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1226 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1227 followerConfig.setElectionTimeoutFactor(10000);
1228 followerActorContext.setConfigParams(followerConfig);
1229 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1230 return followerActorContext;
1234 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1235 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1237 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1239 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1241 Follower follower = new Follower(followerActorContext);
1242 followerActor.underlyingActor().setBehavior(follower);
1243 followerActorContext.setCurrentBehavior(follower);
1245 Map<String, String> peerAddresses = new HashMap<>();
1246 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1248 leaderActorContext.setPeerAddresses(peerAddresses);
1250 leaderActorContext.getReplicatedLog().removeFrom(0);
1253 leaderActorContext.setReplicatedLog(
1254 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1256 leaderActorContext.setCommitIndex(1);
1258 followerActorContext.getReplicatedLog().removeFrom(0);
1260 // follower too has the exact same log entries and has the same commit index
1261 followerActorContext.setReplicatedLog(
1262 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264 followerActorContext.setCommitIndex(1);
1266 leader = new Leader(leaderActorContext);
1267 leaderActorContext.setCurrentBehavior(leader);
1269 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1271 assertEquals(-1, appendEntries.getLeaderCommit());
1272 assertEquals(0, appendEntries.getEntries().size());
1273 assertEquals(0, appendEntries.getPrevLogIndex());
1275 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1276 leaderActor, AppendEntriesReply.class);
1278 assertEquals(2, appendEntriesReply.getLogLastIndex());
1279 assertEquals(1, appendEntriesReply.getLogLastTerm());
1281 // follower returns its next index
1282 assertEquals(2, appendEntriesReply.getLogLastIndex());
1283 assertEquals(1, appendEntriesReply.getLogLastTerm());
1289 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1290 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1292 final MockRaftActorContext leaderActorContext = createActorContext();
1294 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1297 Follower follower = new Follower(followerActorContext);
1298 followerActor.underlyingActor().setBehavior(follower);
1299 followerActorContext.setCurrentBehavior(follower);
1301 Map<String, String> leaderPeerAddresses = new HashMap<>();
1302 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1304 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1306 leaderActorContext.getReplicatedLog().removeFrom(0);
1308 leaderActorContext.setReplicatedLog(
1309 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1311 leaderActorContext.setCommitIndex(1);
1313 followerActorContext.getReplicatedLog().removeFrom(0);
1315 followerActorContext.setReplicatedLog(
1316 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1318 // follower has the same log entries but its commit index > leaders commit index
1319 followerActorContext.setCommitIndex(2);
1321 leader = new Leader(leaderActorContext);
1323 // Initial heartbeat
1324 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1326 assertEquals(-1, appendEntries.getLeaderCommit());
1327 assertEquals(0, appendEntries.getEntries().size());
1328 assertEquals(0, appendEntries.getPrevLogIndex());
1330 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1331 leaderActor, AppendEntriesReply.class);
1333 assertEquals(2, appendEntriesReply.getLogLastIndex());
1334 assertEquals(1, appendEntriesReply.getLogLastTerm());
1336 leaderActor.underlyingActor().setBehavior(follower);
1337 leader.handleMessage(followerActor, appendEntriesReply);
1339 leaderActor.underlyingActor().clear();
1340 followerActor.underlyingActor().clear();
1342 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1343 TimeUnit.MILLISECONDS);
1345 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1347 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1349 assertEquals(2, appendEntries.getLeaderCommit());
1350 assertEquals(0, appendEntries.getEntries().size());
1351 assertEquals(2, appendEntries.getPrevLogIndex());
1353 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1355 assertEquals(2, appendEntriesReply.getLogLastIndex());
1356 assertEquals(1, appendEntriesReply.getLogLastTerm());
1358 assertEquals(2, followerActorContext.getCommitIndex());
1364 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1365 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1367 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1368 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1369 new FiniteDuration(1000, TimeUnit.SECONDS));
1371 leaderActorContext.setReplicatedLog(
1372 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1373 long leaderCommitIndex = 2;
1374 leaderActorContext.setCommitIndex(leaderCommitIndex);
1375 leaderActorContext.setLastApplied(leaderCommitIndex);
1377 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1378 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1380 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1382 followerActorContext.setReplicatedLog(
1383 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1384 followerActorContext.setCommitIndex(0);
1385 followerActorContext.setLastApplied(0);
1387 Follower follower = new Follower(followerActorContext);
1388 followerActor.underlyingActor().setBehavior(follower);
1390 leader = new Leader(leaderActorContext);
1392 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1393 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1394 AppendEntriesReply.class);
1396 MessageCollectorActor.clearMessages(followerActor);
1397 MessageCollectorActor.clearMessages(leaderActor);
1399 // Verify initial AppendEntries sent.
1400 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1401 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1402 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1404 leaderActor.underlyingActor().setBehavior(leader);
1406 leader.handleMessage(followerActor, appendEntriesReply);
1408 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1409 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1411 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1412 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1413 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1415 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1416 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1417 appendEntries.getEntries().get(0).getData());
1418 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1419 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1420 appendEntries.getEntries().get(1).getData());
1422 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1423 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1425 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1427 ApplyState applyState = applyStateList.get(0);
1428 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1429 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1430 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1431 applyState.getReplicatedLogEntry().getData());
1433 applyState = applyStateList.get(1);
1434 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1435 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1436 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1437 applyState.getReplicatedLogEntry().getData());
1439 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1440 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1444 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1445 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1447 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1448 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1449 new FiniteDuration(1000, TimeUnit.SECONDS));
1451 leaderActorContext.setReplicatedLog(
1452 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1453 long leaderCommitIndex = 1;
1454 leaderActorContext.setCommitIndex(leaderCommitIndex);
1455 leaderActorContext.setLastApplied(leaderCommitIndex);
1457 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1458 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1460 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1462 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1463 followerActorContext.setCommitIndex(-1);
1464 followerActorContext.setLastApplied(-1);
1466 Follower follower = new Follower(followerActorContext);
1467 followerActor.underlyingActor().setBehavior(follower);
1468 followerActorContext.setCurrentBehavior(follower);
1470 leader = new Leader(leaderActorContext);
1472 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1473 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1474 AppendEntriesReply.class);
1476 MessageCollectorActor.clearMessages(followerActor);
1477 MessageCollectorActor.clearMessages(leaderActor);
1479 // Verify initial AppendEntries sent with the leader's current commit index.
1480 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1481 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1482 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1484 leaderActor.underlyingActor().setBehavior(leader);
1485 leaderActorContext.setCurrentBehavior(leader);
1487 leader.handleMessage(followerActor, appendEntriesReply);
1489 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1490 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1492 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1494 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1496 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1497 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1498 appendEntries.getEntries().get(0).getData());
1499 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1500 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1501 appendEntries.getEntries().get(1).getData());
1503 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1504 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1506 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1508 ApplyState applyState = applyStateList.get(0);
1509 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1510 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1512 applyState.getReplicatedLogEntry().getData());
1514 applyState = applyStateList.get(1);
1515 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1516 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1518 applyState.getReplicatedLogEntry().getData());
1520 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1521 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1525 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1526 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1528 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1529 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1530 new FiniteDuration(1000, TimeUnit.SECONDS));
1532 leaderActorContext.setReplicatedLog(
1533 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1534 long leaderCommitIndex = 1;
1535 leaderActorContext.setCommitIndex(leaderCommitIndex);
1536 leaderActorContext.setLastApplied(leaderCommitIndex);
1538 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1539 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1541 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1543 followerActorContext.setReplicatedLog(
1544 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1545 followerActorContext.setCommitIndex(-1);
1546 followerActorContext.setLastApplied(-1);
1548 Follower follower = new Follower(followerActorContext);
1549 followerActor.underlyingActor().setBehavior(follower);
1550 followerActorContext.setCurrentBehavior(follower);
1552 leader = new Leader(leaderActorContext);
1554 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1555 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1556 AppendEntriesReply.class);
1558 MessageCollectorActor.clearMessages(followerActor);
1559 MessageCollectorActor.clearMessages(leaderActor);
1561 // Verify initial AppendEntries sent with the leader's current commit index.
1562 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1563 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1564 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1566 leaderActor.underlyingActor().setBehavior(leader);
1567 leaderActorContext.setCurrentBehavior(leader);
1569 leader.handleMessage(followerActor, appendEntriesReply);
1571 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1572 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1574 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1575 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1576 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1578 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1579 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1580 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1581 appendEntries.getEntries().get(0).getData());
1582 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1583 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1584 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1585 appendEntries.getEntries().get(1).getData());
1587 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1588 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1590 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1592 ApplyState applyState = applyStateList.get(0);
1593 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1594 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1595 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1596 applyState.getReplicatedLogEntry().getData());
1598 applyState = applyStateList.get(1);
1599 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1600 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1601 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1602 applyState.getReplicatedLogEntry().getData());
1604 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1605 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1606 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1610 public void testHandleAppendEntriesReplyWithNewerTerm() {
1611 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1613 MockRaftActorContext leaderActorContext = createActorContext();
1614 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1615 new FiniteDuration(10000, TimeUnit.SECONDS));
1617 leaderActorContext.setReplicatedLog(
1618 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1620 leader = new Leader(leaderActorContext);
1621 leaderActor.underlyingActor().setBehavior(leader);
1622 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1624 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1625 AppendEntriesReply.class);
1627 assertEquals(false, appendEntriesReply.isSuccess());
1628 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1630 MessageCollectorActor.clearMessages(leaderActor);
1634 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1635 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1637 MockRaftActorContext leaderActorContext = createActorContext();
1638 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1639 new FiniteDuration(10000, TimeUnit.SECONDS));
1641 leaderActorContext.setReplicatedLog(
1642 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1643 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1645 leader = new Leader(leaderActorContext);
1646 leaderActor.underlyingActor().setBehavior(leader);
1647 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1649 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1650 AppendEntriesReply.class);
1652 assertEquals(false, appendEntriesReply.isSuccess());
1653 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1655 MessageCollectorActor.clearMessages(leaderActor);
1659 public void testHandleAppendEntriesReplySuccess() throws Exception {
1660 logStart("testHandleAppendEntriesReplySuccess");
1662 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1664 leaderActorContext.setReplicatedLog(
1665 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1667 leaderActorContext.setCommitIndex(1);
1668 leaderActorContext.setLastApplied(1);
1669 leaderActorContext.getTermInformation().update(1, "leader");
1671 leader = new Leader(leaderActorContext);
1673 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1675 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1676 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1678 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1680 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1682 assertEquals(RaftState.Leader, raftActorBehavior.state());
1684 assertEquals(2, leaderActorContext.getCommitIndex());
1686 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1687 leaderActor, ApplyJournalEntries.class);
1689 assertEquals(2, leaderActorContext.getLastApplied());
1691 assertEquals(2, applyJournalEntries.getToIndex());
1693 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1696 assertEquals(1,applyStateList.size());
1698 ApplyState applyState = applyStateList.get(0);
1700 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1702 assertEquals(2, followerInfo.getMatchIndex());
1703 assertEquals(3, followerInfo.getNextIndex());
1704 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1705 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1709 public void testHandleAppendEntriesReplyUnknownFollower() {
1710 logStart("testHandleAppendEntriesReplyUnknownFollower");
1712 MockRaftActorContext leaderActorContext = createActorContext();
1714 leader = new Leader(leaderActorContext);
1716 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1718 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1720 assertEquals(RaftState.Leader, raftActorBehavior.state());
1724 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1725 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1727 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1728 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1729 new FiniteDuration(1000, TimeUnit.SECONDS));
1730 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1732 leaderActorContext.setReplicatedLog(
1733 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1734 long leaderCommitIndex = 3;
1735 leaderActorContext.setCommitIndex(leaderCommitIndex);
1736 leaderActorContext.setLastApplied(leaderCommitIndex);
1738 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1739 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1740 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1741 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1743 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1745 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1746 followerActorContext.setCommitIndex(-1);
1747 followerActorContext.setLastApplied(-1);
1749 Follower follower = new Follower(followerActorContext);
1750 followerActor.underlyingActor().setBehavior(follower);
1751 followerActorContext.setCurrentBehavior(follower);
1753 leader = new Leader(leaderActorContext);
1755 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1756 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1757 AppendEntriesReply.class);
1759 MessageCollectorActor.clearMessages(followerActor);
1760 MessageCollectorActor.clearMessages(leaderActor);
1762 // Verify initial AppendEntries sent with the leader's current commit index.
1763 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1764 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1765 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1767 leaderActor.underlyingActor().setBehavior(leader);
1768 leaderActorContext.setCurrentBehavior(leader);
1770 leader.handleMessage(followerActor, appendEntriesReply);
1772 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1773 AppendEntries.class, 2);
1774 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1776 appendEntries = appendEntriesList.get(0);
1777 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1778 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1779 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1781 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1782 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1783 appendEntries.getEntries().get(0).getData());
1784 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1785 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1786 appendEntries.getEntries().get(1).getData());
1788 appendEntries = appendEntriesList.get(1);
1789 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1790 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1791 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1793 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1794 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1795 appendEntries.getEntries().get(0).getData());
1796 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1797 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1798 appendEntries.getEntries().get(1).getData());
1800 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1801 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1803 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1805 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1806 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1810 public void testHandleRequestVoteReply() {
1811 logStart("testHandleRequestVoteReply");
1813 MockRaftActorContext leaderActorContext = createActorContext();
1815 leader = new Leader(leaderActorContext);
1817 // Should be a no-op.
1818 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1819 new RequestVoteReply(1, true));
1821 assertEquals(RaftState.Leader, raftActorBehavior.state());
1823 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1825 assertEquals(RaftState.Leader, raftActorBehavior.state());
1829 public void testIsolatedLeaderCheckNoFollowers() {
1830 logStart("testIsolatedLeaderCheckNoFollowers");
1832 MockRaftActorContext leaderActorContext = createActorContext();
1834 leader = new Leader(leaderActorContext);
1835 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1836 assertTrue(newBehavior instanceof Leader);
1840 public void testIsolatedLeaderCheckNoVotingFollowers() {
1841 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1843 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1844 Follower follower = new Follower(followerActorContext);
1845 followerActor.underlyingActor().setBehavior(follower);
1847 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1848 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1849 new FiniteDuration(1000, TimeUnit.SECONDS));
1850 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1852 leader = new Leader(leaderActorContext);
1853 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1854 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855 assertTrue("Expected Leader", newBehavior instanceof Leader);
1858 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1859 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1860 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1862 MockRaftActorContext leaderActorContext = createActorContext();
1864 Map<String, String> peerAddresses = new HashMap<>();
1865 peerAddresses.put("follower-1", followerActor1.path().toString());
1866 peerAddresses.put("follower-2", followerActor2.path().toString());
1868 leaderActorContext.setPeerAddresses(peerAddresses);
1869 leaderActorContext.setRaftPolicy(raftPolicy);
1871 leader = new Leader(leaderActorContext);
1873 leader.markFollowerActive("follower-1");
1874 leader.markFollowerActive("follower-2");
1875 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1878 // kill 1 follower and verify if that got killed
1879 final JavaTestKit probe = new JavaTestKit(getSystem());
1880 probe.watch(followerActor1);
1881 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1882 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1883 assertEquals(termMsg1.getActor(), followerActor1);
1885 leader.markFollowerInActive("follower-1");
1886 leader.markFollowerActive("follower-2");
1887 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888 assertTrue("Behavior not instance of Leader when majority of followers are active",
1889 newBehavior instanceof Leader);
1891 // kill 2nd follower and leader should change to Isolated leader
1892 followerActor2.tell(PoisonPill.getInstance(), null);
1893 probe.watch(followerActor2);
1894 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1895 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1896 assertEquals(termMsg2.getActor(), followerActor2);
1898 leader.markFollowerInActive("follower-2");
1899 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1903 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1904 logStart("testIsolatedLeaderCheckTwoFollowers");
1906 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1908 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1909 newBehavior instanceof IsolatedLeader);
1913 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1914 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1916 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1918 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1919 newBehavior instanceof Leader);
1923 public void testLaggingFollowerStarvation() throws Exception {
1924 logStart("testLaggingFollowerStarvation");
1926 String leaderActorId = actorFactory.generateActorId("leader");
1927 String follower1ActorId = actorFactory.generateActorId("follower");
1928 String follower2ActorId = actorFactory.generateActorId("follower");
1930 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1931 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1933 MockRaftActorContext leaderActorContext =
1934 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1936 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1937 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1938 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1940 leaderActorContext.setConfigParams(configParams);
1942 leaderActorContext.setReplicatedLog(
1943 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1945 Map<String, String> peerAddresses = new HashMap<>();
1946 peerAddresses.put(follower1ActorId,
1947 follower1Actor.path().toString());
1948 peerAddresses.put(follower2ActorId,
1949 follower2Actor.path().toString());
1951 leaderActorContext.setPeerAddresses(peerAddresses);
1952 leaderActorContext.getTermInformation().update(1, leaderActorId);
1954 leader = createBehavior(leaderActorContext);
1956 leaderActor.underlyingActor().setBehavior(leader);
1958 for (int i = 1; i < 6; i++) {
1959 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1960 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1961 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1962 assertTrue(newBehavior == leader);
1963 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1966 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1967 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1969 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1970 heartbeats.size() > 1);
1972 // Check if follower-2 got AppendEntries during this time and was not starved
1973 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1975 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1976 appendEntries.size() > 1);
1980 public void testReplicationConsensusWithNonVotingFollower() {
1981 logStart("testReplicationConsensusWithNonVotingFollower");
1983 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1984 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1985 new FiniteDuration(1000, TimeUnit.SECONDS));
1987 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1988 leaderActorContext.setCommitIndex(-1);
1989 leaderActorContext.setLastApplied(-1);
1991 String nonVotingFollowerId = "nonvoting-follower";
1992 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1995 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
1996 VotingState.NON_VOTING);
1998 leader = new Leader(leaderActorContext);
1999 leaderActorContext.setCurrentBehavior(leader);
2001 // Ignore initial heartbeats
2002 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2003 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2005 MessageCollectorActor.clearMessages(followerActor);
2006 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2007 MessageCollectorActor.clearMessages(leaderActor);
2009 // Send a Replicate message and wait for AppendEntries.
2010 sendReplicate(leaderActorContext, 0);
2012 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2013 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2015 // Send reply only from the voting follower and verify consensus via ApplyState.
2016 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2018 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2020 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2022 MessageCollectorActor.clearMessages(followerActor);
2023 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2024 MessageCollectorActor.clearMessages(leaderActor);
2026 // Send another Replicate message
2027 sendReplicate(leaderActorContext, 1);
2029 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2030 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2031 AppendEntries.class);
2032 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2033 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2035 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2036 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2038 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2040 // Send reply from the voting follower and verify consensus.
2041 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2043 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2047 public void testTransferLeadershipWithFollowerInSync() {
2048 logStart("testTransferLeadershipWithFollowerInSync");
2050 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051 leaderActorContext.setLastApplied(-1);
2052 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2053 new FiniteDuration(1000, TimeUnit.SECONDS));
2054 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2056 leader = new Leader(leaderActorContext);
2057 leaderActorContext.setCurrentBehavior(leader);
2059 // Initial heartbeat
2060 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2061 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2062 MessageCollectorActor.clearMessages(followerActor);
2064 sendReplicate(leaderActorContext, 0);
2065 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2067 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2068 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2069 MessageCollectorActor.clearMessages(followerActor);
2071 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2072 leader.transferLeadership(mockTransferCohort);
2074 verify(mockTransferCohort, never()).transferComplete();
2075 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2076 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2078 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2079 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2081 // Leader should force an election timeout
2082 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2084 verify(mockTransferCohort).transferComplete();
2088 public void testTransferLeadershipWithEmptyLog() {
2089 logStart("testTransferLeadershipWithEmptyLog");
2091 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2092 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2093 new FiniteDuration(1000, TimeUnit.SECONDS));
2094 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2096 leader = new Leader(leaderActorContext);
2097 leaderActorContext.setCurrentBehavior(leader);
2099 // Initial heartbeat
2100 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2101 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2102 MessageCollectorActor.clearMessages(followerActor);
2104 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2105 leader.transferLeadership(mockTransferCohort);
2107 verify(mockTransferCohort, never()).transferComplete();
2108 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2109 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2111 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2112 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2114 // Leader should force an election timeout
2115 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2117 verify(mockTransferCohort).transferComplete();
2121 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2122 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2124 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2125 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2126 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2128 leader = new Leader(leaderActorContext);
2129 leaderActorContext.setCurrentBehavior(leader);
2131 // Initial heartbeat
2132 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133 MessageCollectorActor.clearMessages(followerActor);
2135 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2136 leader.transferLeadership(mockTransferCohort);
2138 verify(mockTransferCohort, never()).transferComplete();
2140 // Sync up the follower.
2141 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2142 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2143 MessageCollectorActor.clearMessages(followerActor);
2145 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2146 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2147 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2148 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2151 // Leader should force an election timeout
2152 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2154 verify(mockTransferCohort).transferComplete();
2158 public void testTransferLeadershipWithFollowerSyncTimeout() {
2159 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2161 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2162 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2163 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2164 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2165 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2167 leader = new Leader(leaderActorContext);
2168 leaderActorContext.setCurrentBehavior(leader);
2170 // Initial heartbeat
2171 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2172 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2173 MessageCollectorActor.clearMessages(followerActor);
2175 sendReplicate(leaderActorContext, 0);
2176 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178 MessageCollectorActor.clearMessages(followerActor);
2180 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2181 leader.transferLeadership(mockTransferCohort);
2183 verify(mockTransferCohort, never()).transferComplete();
2185 // Send heartbeats to time out the transfer.
2186 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2187 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2188 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2189 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2192 verify(mockTransferCohort).abortTransfer();
2193 verify(mockTransferCohort, never()).transferComplete();
2194 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2198 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2199 ActorRef actorRef, RaftRPC rpc) throws Exception {
2200 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2201 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2204 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2206 private final long electionTimeOutIntervalMillis;
2207 private final int snapshotChunkSize;
2209 MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2211 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2212 this.snapshotChunkSize = snapshotChunkSize;
2216 public FiniteDuration getElectionTimeOutInterval() {
2217 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2221 public int getSnapshotChunkSize() {
2222 return snapshotChunkSize;