2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
46 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
49 import org.opendaylight.controller.cluster.raft.RaftState;
50 import org.opendaylight.controller.cluster.raft.RaftVersions;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.VotingState;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
60 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
66 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
67 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
68 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
71 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import org.opendaylight.yangtools.concepts.Identifier;
76 import scala.concurrent.duration.FiniteDuration;
78 public class LeaderTest extends AbstractLeaderTest<Leader> {
80 static final String FOLLOWER_ID = "follower";
81 public static final String LEADER_ID = "leader";
83 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
84 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
86 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
87 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
89 private Leader leader;
90 private final short payloadVersion = 5;
94 public void tearDown() throws Exception {
103 public void testHandleMessageForUnknownMessage() throws Exception {
104 logStart("testHandleMessageForUnknownMessage");
106 leader = new Leader(createActorContext());
108 // handle message should null when it receives an unknown message
109 assertNull(leader.handleMessage(followerActor, "foo"));
113 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
114 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
116 MockRaftActorContext actorContext = createActorContextWithFollower();
117 actorContext.setCommitIndex(-1);
118 actorContext.setPayloadVersion(payloadVersion);
121 actorContext.getTermInformation().update(term, "");
123 leader = new Leader(actorContext);
124 actorContext.setCurrentBehavior(leader);
126 // Leader should send an immediate heartbeat with no entries as follower is inactive.
127 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
128 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
129 assertEquals("getTerm", term, appendEntries.getTerm());
130 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
131 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
132 assertEquals("Entries size", 0, appendEntries.getEntries().size());
133 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
135 // The follower would normally reply - simulate that explicitly here.
136 leader.handleMessage(followerActor, new AppendEntriesReply(
137 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
138 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
140 followerActor.underlyingActor().clear();
142 // Sleep for the heartbeat interval so AppendEntries is sent.
143 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
144 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
146 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
148 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
149 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
150 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
151 assertEquals("Entries size", 1, appendEntries.getEntries().size());
152 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
153 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
154 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
158 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
159 return sendReplicate(actorContext, 1, index);
162 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
163 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
164 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
165 actorContext.getReplicatedLog().append(newEntry);
166 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
170 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
171 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
173 MockRaftActorContext actorContext = createActorContextWithFollower();
176 actorContext.getTermInformation().update(term, "");
178 leader = new Leader(actorContext);
180 // Leader will send an immediate heartbeat - ignore it.
181 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
183 // The follower would normally reply - simulate that explicitly here.
184 long lastIndex = actorContext.getReplicatedLog().lastIndex();
185 leader.handleMessage(followerActor, new AppendEntriesReply(
186 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
187 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
189 followerActor.underlyingActor().clear();
191 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
193 // State should not change
194 assertTrue(raftBehavior instanceof Leader);
196 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
197 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
198 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
199 assertEquals("Entries size", 1, appendEntries.getEntries().size());
200 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
201 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
202 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
203 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
207 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
208 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
210 MockRaftActorContext actorContext = createActorContextWithFollower();
211 actorContext.setCommitIndex(-1);
212 actorContext.setLastApplied(-1);
214 // The raft context is initialized with a couple log entries. However the commitIndex
215 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
216 // committed and applied. Now it regains leadership with a higher term (2).
217 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
218 long newTerm = prevTerm + 1;
219 actorContext.getTermInformation().update(newTerm, "");
221 leader = new Leader(actorContext);
222 actorContext.setCurrentBehavior(leader);
224 // Leader will send an immediate heartbeat - ignore it.
225 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
227 // The follower replies with the leader's current last index and term, simulating that it is
228 // up to date with the leader.
229 long lastIndex = actorContext.getReplicatedLog().lastIndex();
230 leader.handleMessage(followerActor, new AppendEntriesReply(
231 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
233 // The commit index should not get updated even though consensus was reached. This is b/c the
234 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
235 // from previous terms by counting replicas".
236 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
238 followerActor.underlyingActor().clear();
240 // Now replicate a new entry with the new term 2.
241 long newIndex = lastIndex + 1;
242 sendReplicate(actorContext, newTerm, newIndex);
244 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
245 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
246 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
247 assertEquals("Entries size", 1, appendEntries.getEntries().size());
248 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
249 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
250 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
252 // The follower replies with success. The leader should now update the commit index to the new index
253 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
254 // prior entries are committed indirectly".
255 leader.handleMessage(followerActor, new AppendEntriesReply(
256 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
258 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
262 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
263 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
265 MockRaftActorContext actorContext = createActorContextWithFollower();
266 actorContext.setRaftPolicy(createRaftPolicy(true, true));
269 actorContext.getTermInformation().update(term, "");
271 leader = new Leader(actorContext);
273 // Leader will send an immediate heartbeat - ignore it.
274 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
276 // The follower would normally reply - simulate that explicitly here.
277 long lastIndex = actorContext.getReplicatedLog().lastIndex();
278 leader.handleMessage(followerActor, new AppendEntriesReply(
279 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
280 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
282 followerActor.underlyingActor().clear();
284 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
286 // State should not change
287 assertTrue(raftBehavior instanceof Leader);
289 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
290 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
291 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
292 assertEquals("Entries size", 1, appendEntries.getEntries().size());
293 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
294 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
295 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
296 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
300 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
301 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
303 MockRaftActorContext actorContext = createActorContextWithFollower();
304 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
306 public FiniteDuration getHeartBeatInterval() {
307 return FiniteDuration.apply(5, TimeUnit.SECONDS);
312 actorContext.getTermInformation().update(term, "");
314 leader = new Leader(actorContext);
316 // Leader will send an immediate heartbeat - ignore it.
317 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
319 // The follower would normally reply - simulate that explicitly here.
320 long lastIndex = actorContext.getReplicatedLog().lastIndex();
321 leader.handleMessage(followerActor, new AppendEntriesReply(
322 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
323 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
325 followerActor.underlyingActor().clear();
327 for (int i = 0; i < 5; i++) {
328 sendReplicate(actorContext, lastIndex + i + 1);
331 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
332 // We expect only 1 message to be sent because of two reasons,
333 // - an append entries reply was not received
334 // - the heartbeat interval has not expired
335 // In this scenario if multiple messages are sent they would likely be duplicates
336 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
340 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
341 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
343 MockRaftActorContext actorContext = createActorContextWithFollower();
344 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
346 public FiniteDuration getHeartBeatInterval() {
347 return FiniteDuration.apply(5, TimeUnit.SECONDS);
352 actorContext.getTermInformation().update(term, "");
354 leader = new Leader(actorContext);
356 // Leader will send an immediate heartbeat - ignore it.
357 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
359 // The follower would normally reply - simulate that explicitly here.
360 long lastIndex = actorContext.getReplicatedLog().lastIndex();
361 leader.handleMessage(followerActor, new AppendEntriesReply(
362 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
363 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
365 followerActor.underlyingActor().clear();
367 for (int i = 0; i < 3; i++) {
368 sendReplicate(actorContext, lastIndex + i + 1);
369 leader.handleMessage(followerActor, new AppendEntriesReply(
370 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
374 for (int i = 3; i < 5; i++) {
375 sendReplicate(actorContext, lastIndex + i + 1);
378 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
379 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
380 // get sent to the follower - but not the 5th
381 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
383 for (int i = 0; i < 4; i++) {
384 long expected = allMessages.get(i).getEntries().get(0).getIndex();
385 assertEquals(expected, i + 2);
390 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
391 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
393 MockRaftActorContext actorContext = createActorContextWithFollower();
394 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
396 public FiniteDuration getHeartBeatInterval() {
397 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
402 actorContext.getTermInformation().update(term, "");
404 leader = new Leader(actorContext);
406 // Leader will send an immediate heartbeat - ignore it.
407 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
409 // The follower would normally reply - simulate that explicitly here.
410 long lastIndex = actorContext.getReplicatedLog().lastIndex();
411 leader.handleMessage(followerActor, new AppendEntriesReply(
412 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
413 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
415 followerActor.underlyingActor().clear();
417 sendReplicate(actorContext, lastIndex + 1);
419 // Wait slightly longer than heartbeat duration
420 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
422 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
424 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
425 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
427 assertEquals(1, allMessages.get(0).getEntries().size());
428 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
429 assertEquals(1, allMessages.get(1).getEntries().size());
430 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
435 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
436 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
438 MockRaftActorContext actorContext = createActorContextWithFollower();
439 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
441 public FiniteDuration getHeartBeatInterval() {
442 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
447 actorContext.getTermInformation().update(term, "");
449 leader = new Leader(actorContext);
451 // Leader will send an immediate heartbeat - ignore it.
452 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
454 // The follower would normally reply - simulate that explicitly here.
455 long lastIndex = actorContext.getReplicatedLog().lastIndex();
456 leader.handleMessage(followerActor, new AppendEntriesReply(
457 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
458 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
460 followerActor.underlyingActor().clear();
462 for (int i = 0; i < 3; i++) {
463 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
464 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
467 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
468 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
472 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
473 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
475 MockRaftActorContext actorContext = createActorContextWithFollower();
476 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
478 public FiniteDuration getHeartBeatInterval() {
479 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
484 actorContext.getTermInformation().update(term, "");
486 leader = new Leader(actorContext);
488 // Leader will send an immediate heartbeat - ignore it.
489 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
491 // The follower would normally reply - simulate that explicitly here.
492 long lastIndex = actorContext.getReplicatedLog().lastIndex();
493 leader.handleMessage(followerActor, new AppendEntriesReply(
494 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
495 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
497 followerActor.underlyingActor().clear();
499 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
500 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
501 sendReplicate(actorContext, lastIndex + 1);
503 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
504 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
506 assertEquals(0, allMessages.get(0).getEntries().size());
507 assertEquals(1, allMessages.get(1).getEntries().size());
512 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
513 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
515 MockRaftActorContext actorContext = createActorContext();
517 leader = new Leader(actorContext);
519 actorContext.setLastApplied(0);
521 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
522 long term = actorContext.getTermInformation().getCurrentTerm();
523 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
524 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
526 actorContext.getReplicatedLog().append(newEntry);
528 final Identifier id = new MockIdentifier("state-id");
529 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
530 new Replicate(leaderActor, id, newEntry, true));
532 // State should not change
533 assertTrue(raftBehavior instanceof Leader);
535 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
537 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
538 // one since lastApplied state is 0.
539 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
540 leaderActor, ApplyState.class);
541 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
543 for (int i = 0; i <= newLogIndex - 1; i++) {
544 ApplyState applyState = applyStateList.get(i);
545 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
546 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
549 ApplyState last = applyStateList.get((int) newLogIndex - 1);
550 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
551 assertEquals("getIdentifier", id, last.getIdentifier());
555 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
556 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
558 final MockRaftActorContext actorContext = createActorContextWithFollower();
560 Map<String, String> leadersSnapshot = new HashMap<>();
561 leadersSnapshot.put("1", "A");
562 leadersSnapshot.put("2", "B");
563 leadersSnapshot.put("3", "C");
566 actorContext.getReplicatedLog().removeFrom(0);
568 final int commitIndex = 3;
569 final int snapshotIndex = 2;
570 final int snapshotTerm = 1;
572 // set the snapshot variables in replicatedlog
573 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
574 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
575 actorContext.setCommitIndex(commitIndex);
576 //set follower timeout to 2 mins, helps during debugging
577 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
579 leader = new Leader(actorContext);
581 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
582 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
584 //update follower timestamp
585 leader.markFollowerActive(FOLLOWER_ID);
587 ByteString bs = toByteString(leadersSnapshot);
588 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
589 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
590 -1, null, null), ByteSource.wrap(bs.toByteArray())));
591 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
592 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
593 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
594 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
596 //send first chunk and no InstallSnapshotReply received yet
598 fts.incrementChunkIndex();
600 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
601 TimeUnit.MILLISECONDS);
603 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
605 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
607 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
609 //InstallSnapshotReply received
610 fts.markSendStatus(true);
612 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
614 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
616 assertEquals(commitIndex, is.getLastIncludedIndex());
620 public void testSendAppendEntriesSnapshotScenario() throws Exception {
621 logStart("testSendAppendEntriesSnapshotScenario");
623 final MockRaftActorContext actorContext = createActorContextWithFollower();
625 Map<String, String> leadersSnapshot = new HashMap<>();
626 leadersSnapshot.put("1", "A");
627 leadersSnapshot.put("2", "B");
628 leadersSnapshot.put("3", "C");
631 actorContext.getReplicatedLog().removeFrom(0);
633 final int followersLastIndex = 2;
634 final int snapshotIndex = 3;
635 final int newEntryIndex = 4;
636 final int snapshotTerm = 1;
637 final int currentTerm = 2;
639 // set the snapshot variables in replicatedlog
640 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
641 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
642 actorContext.setCommitIndex(followersLastIndex);
644 leader = new Leader(actorContext);
646 // Leader will send an immediate heartbeat - ignore it.
647 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
650 SimpleReplicatedLogEntry entry =
651 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
652 new MockRaftActorContext.MockPayload("D"));
654 actorContext.getReplicatedLog().append(entry);
656 //update follower timestamp
657 leader.markFollowerActive(FOLLOWER_ID);
659 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
660 RaftActorBehavior raftBehavior = leader.handleMessage(
661 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
663 assertTrue(raftBehavior instanceof Leader);
665 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
669 public void testInitiateInstallSnapshot() throws Exception {
670 logStart("testInitiateInstallSnapshot");
672 MockRaftActorContext actorContext = createActorContextWithFollower();
675 actorContext.getReplicatedLog().removeFrom(0);
677 final int followersLastIndex = 2;
678 final int snapshotIndex = 3;
679 final int newEntryIndex = 4;
680 final int snapshotTerm = 1;
681 final int currentTerm = 2;
683 // set the snapshot variables in replicatedlog
684 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
685 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
686 actorContext.setLastApplied(3);
687 actorContext.setCommitIndex(followersLastIndex);
689 leader = new Leader(actorContext);
691 // Leader will send an immediate heartbeat - ignore it.
692 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
694 // set the snapshot as absent and check if capture-snapshot is invoked.
695 leader.setSnapshot(null);
698 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
699 new MockRaftActorContext.MockPayload("D"));
701 actorContext.getReplicatedLog().append(entry);
703 //update follower timestamp
704 leader.markFollowerActive(FOLLOWER_ID);
706 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
708 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
710 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
712 assertEquals(3, cs.getLastAppliedIndex());
713 assertEquals(1, cs.getLastAppliedTerm());
714 assertEquals(4, cs.getLastIndex());
715 assertEquals(2, cs.getLastTerm());
717 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
718 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
720 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
724 public void testInitiateForceInstallSnapshot() throws Exception {
725 logStart("testInitiateForceInstallSnapshot");
727 MockRaftActorContext actorContext = createActorContextWithFollower();
729 final int followersLastIndex = 2;
730 final int snapshotIndex = -1;
731 final int newEntryIndex = 4;
732 final int snapshotTerm = -1;
733 final int currentTerm = 2;
735 // set the snapshot variables in replicatedlog
736 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738 actorContext.setLastApplied(3);
739 actorContext.setCommitIndex(followersLastIndex);
741 actorContext.getReplicatedLog().removeFrom(0);
743 AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
744 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
746 leader = new Leader(actorContext);
747 actorContext.setCurrentBehavior(leader);
749 // Leader will send an immediate heartbeat - ignore it.
750 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
752 // set the snapshot as absent and check if capture-snapshot is invoked.
753 leader.setSnapshot(null);
755 for (int i = 0; i < 4; i++) {
756 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
757 new MockRaftActorContext.MockPayload("X" + i)));
761 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
762 new MockRaftActorContext.MockPayload("D"));
764 actorContext.getReplicatedLog().append(entry);
766 //update follower timestamp
767 leader.markFollowerActive(FOLLOWER_ID);
769 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
770 // installed with a SendInstallSnapshot
771 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
773 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
775 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
776 assertEquals(3, cs.getLastAppliedIndex());
777 assertEquals(1, cs.getLastAppliedTerm());
778 assertEquals(4, cs.getLastIndex());
779 assertEquals(2, cs.getLastTerm());
781 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
782 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
784 MessageCollectorActor.clearMessages(followerActor);
786 // Sending Replicate message should not initiate another capture since the first is in progress.
787 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
788 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
790 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
791 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
792 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
794 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
795 final byte[] bytes = new byte[]{1, 2, 3};
796 installSnapshotStream.get().get().write(bytes);
797 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
798 Runtime.getRuntime().totalMemory());
799 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
801 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
802 MessageCollectorActor.clearMessages(followerActor);
803 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
804 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
809 public void testInstallSnapshot() throws Exception {
810 logStart("testInstallSnapshot");
812 final MockRaftActorContext actorContext = createActorContextWithFollower();
814 Map<String, String> leadersSnapshot = new HashMap<>();
815 leadersSnapshot.put("1", "A");
816 leadersSnapshot.put("2", "B");
817 leadersSnapshot.put("3", "C");
820 actorContext.getReplicatedLog().removeFrom(0);
822 final int lastAppliedIndex = 3;
823 final int snapshotIndex = 2;
824 final int snapshotTerm = 1;
825 final int currentTerm = 2;
827 // set the snapshot variables in replicatedlog
828 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
829 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
830 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
831 actorContext.setCommitIndex(lastAppliedIndex);
832 actorContext.setLastApplied(lastAppliedIndex);
834 leader = new Leader(actorContext);
836 // Initial heartbeat.
837 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
839 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
840 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
842 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
843 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
844 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
846 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
847 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
849 assertTrue(raftBehavior instanceof Leader);
851 // check if installsnapshot gets called with the correct values.
853 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
854 InstallSnapshot.class);
856 assertNotNull(installSnapshot.getData());
857 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
858 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
860 assertEquals(currentTerm, installSnapshot.getTerm());
864 public void testForceInstallSnapshot() throws Exception {
865 logStart("testForceInstallSnapshot");
867 final MockRaftActorContext actorContext = createActorContextWithFollower();
869 Map<String, String> leadersSnapshot = new HashMap<>();
870 leadersSnapshot.put("1", "A");
871 leadersSnapshot.put("2", "B");
872 leadersSnapshot.put("3", "C");
874 final int lastAppliedIndex = 3;
875 final int snapshotIndex = -1;
876 final int snapshotTerm = -1;
877 final int currentTerm = 2;
879 // set the snapshot variables in replicatedlog
880 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
881 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
882 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
883 actorContext.setCommitIndex(lastAppliedIndex);
884 actorContext.setLastApplied(lastAppliedIndex);
886 leader = new Leader(actorContext);
888 // Initial heartbeat.
889 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
891 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
892 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
894 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
895 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
896 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
898 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
899 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
901 assertTrue(raftBehavior instanceof Leader);
903 // check if installsnapshot gets called with the correct values.
905 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
906 InstallSnapshot.class);
908 assertNotNull(installSnapshot.getData());
909 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
910 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
912 assertEquals(currentTerm, installSnapshot.getTerm());
916 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
917 logStart("testHandleInstallSnapshotReplyLastChunk");
919 MockRaftActorContext actorContext = createActorContextWithFollower();
921 final int commitIndex = 3;
922 final int snapshotIndex = 2;
923 final int snapshotTerm = 1;
924 final int currentTerm = 2;
926 actorContext.setCommitIndex(commitIndex);
928 leader = new Leader(actorContext);
929 actorContext.setCurrentBehavior(leader);
931 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
932 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
934 // Ignore initial heartbeat.
935 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
937 Map<String, String> leadersSnapshot = new HashMap<>();
938 leadersSnapshot.put("1", "A");
939 leadersSnapshot.put("2", "B");
940 leadersSnapshot.put("3", "C");
942 // set the snapshot variables in replicatedlog
944 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
945 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
946 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
948 ByteString bs = toByteString(leadersSnapshot);
949 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
950 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
951 -1, null, null), ByteSource.wrap(bs.toByteArray())));
952 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
953 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
954 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
955 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
956 while (!fts.isLastChunk(fts.getChunkIndex())) {
958 fts.incrementChunkIndex();
962 actorContext.getReplicatedLog().removeFrom(0);
964 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
965 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
967 assertTrue(raftBehavior instanceof Leader);
969 assertEquals(1, leader.followerLogSize());
970 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
972 assertNull(fli.getInstallSnapshotState());
973 assertEquals(commitIndex, fli.getMatchIndex());
974 assertEquals(commitIndex + 1, fli.getNextIndex());
975 assertFalse(leader.hasSnapshot());
979 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
980 logStart("testSendSnapshotfromInstallSnapshotReply");
982 MockRaftActorContext actorContext = createActorContextWithFollower();
984 final int commitIndex = 3;
985 final int snapshotIndex = 2;
986 final int snapshotTerm = 1;
987 final int currentTerm = 2;
989 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
991 public int getSnapshotChunkSize() {
995 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
996 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
998 actorContext.setConfigParams(configParams);
999 actorContext.setCommitIndex(commitIndex);
1001 leader = new Leader(actorContext);
1002 actorContext.setCurrentBehavior(leader);
1004 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1005 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1007 Map<String, String> leadersSnapshot = new HashMap<>();
1008 leadersSnapshot.put("1", "A");
1009 leadersSnapshot.put("2", "B");
1010 leadersSnapshot.put("3", "C");
1012 // set the snapshot variables in replicatedlog
1013 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1014 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1015 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1017 ByteString bs = toByteString(leadersSnapshot);
1018 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1019 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1022 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1024 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1025 InstallSnapshot.class);
1027 assertEquals(1, installSnapshot.getChunkIndex());
1028 assertEquals(3, installSnapshot.getTotalChunks());
1030 followerActor.underlyingActor().clear();
1031 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1032 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1034 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1036 assertEquals(2, installSnapshot.getChunkIndex());
1037 assertEquals(3, installSnapshot.getTotalChunks());
1039 followerActor.underlyingActor().clear();
1040 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1041 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1043 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1045 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1046 followerActor.underlyingActor().clear();
1047 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1048 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1050 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1052 assertNull(installSnapshot);
1057 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1058 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1060 MockRaftActorContext actorContext = createActorContextWithFollower();
1062 final int commitIndex = 3;
1063 final int snapshotIndex = 2;
1064 final int snapshotTerm = 1;
1065 final int currentTerm = 2;
1067 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1069 public int getSnapshotChunkSize() {
1074 actorContext.setCommitIndex(commitIndex);
1076 leader = new Leader(actorContext);
1078 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1079 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1081 Map<String, String> leadersSnapshot = new HashMap<>();
1082 leadersSnapshot.put("1", "A");
1083 leadersSnapshot.put("2", "B");
1084 leadersSnapshot.put("3", "C");
1086 // set the snapshot variables in replicatedlog
1087 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1088 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1089 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1091 ByteString bs = toByteString(leadersSnapshot);
1092 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1093 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1096 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1097 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1099 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1100 InstallSnapshot.class);
1102 assertEquals(1, installSnapshot.getChunkIndex());
1103 assertEquals(3, installSnapshot.getTotalChunks());
1105 followerActor.underlyingActor().clear();
1107 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1108 FOLLOWER_ID, -1, false));
1110 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1111 TimeUnit.MILLISECONDS);
1113 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1115 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1117 assertEquals(1, installSnapshot.getChunkIndex());
1118 assertEquals(3, installSnapshot.getTotalChunks());
1122 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1123 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1125 MockRaftActorContext actorContext = createActorContextWithFollower();
1127 final int commitIndex = 3;
1128 final int snapshotIndex = 2;
1129 final int snapshotTerm = 1;
1130 final int currentTerm = 2;
1132 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1134 public int getSnapshotChunkSize() {
1139 actorContext.setCommitIndex(commitIndex);
1141 leader = new Leader(actorContext);
1143 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1144 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1146 Map<String, String> leadersSnapshot = new HashMap<>();
1147 leadersSnapshot.put("1", "A");
1148 leadersSnapshot.put("2", "B");
1149 leadersSnapshot.put("3", "C");
1151 // set the snapshot variables in replicatedlog
1152 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1153 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1154 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1156 ByteString bs = toByteString(leadersSnapshot);
1157 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1158 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1161 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1163 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1164 InstallSnapshot.class);
1166 assertEquals(1, installSnapshot.getChunkIndex());
1167 assertEquals(3, installSnapshot.getTotalChunks());
1168 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1169 installSnapshot.getLastChunkHashCode().get().intValue());
1171 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1173 followerActor.underlyingActor().clear();
1175 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1176 FOLLOWER_ID, 1, true));
1178 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1180 assertEquals(2, installSnapshot.getChunkIndex());
1181 assertEquals(3, installSnapshot.getTotalChunks());
1182 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1186 public void testLeaderInstallSnapshotState() throws IOException {
1187 logStart("testLeaderInstallSnapshotState");
1189 Map<String, String> leadersSnapshot = new HashMap<>();
1190 leadersSnapshot.put("1", "A");
1191 leadersSnapshot.put("2", "B");
1192 leadersSnapshot.put("3", "C");
1194 ByteString bs = toByteString(leadersSnapshot);
1195 byte[] barray = bs.toByteArray();
1197 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1198 fts.setSnapshotBytes(ByteSource.wrap(barray));
1200 assertEquals(bs.size(), barray.length);
1203 for (int i = 0; i < barray.length; i = i + 50) {
1204 int length = i + 50;
1207 if (i + 50 > barray.length) {
1208 length = barray.length;
1211 byte[] chunk = fts.getNextChunk();
1212 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1213 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1215 fts.markSendStatus(true);
1216 if (!fts.isLastChunk(chunkIndex)) {
1217 fts.incrementChunkIndex();
1221 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1226 protected Leader createBehavior(final RaftActorContext actorContext) {
1227 return new Leader(actorContext);
1231 protected MockRaftActorContext createActorContext() {
1232 return createActorContext(leaderActor);
1236 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1237 return createActorContext(LEADER_ID, actorRef);
1240 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1241 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1242 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1243 configParams.setElectionTimeoutFactor(100000);
1244 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1245 context.setConfigParams(configParams);
1246 context.setPayloadVersion(payloadVersion);
1250 private MockRaftActorContext createActorContextWithFollower() {
1251 MockRaftActorContext actorContext = createActorContext();
1252 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1253 followerActor.path().toString()).build());
1254 return actorContext;
1257 private MockRaftActorContext createFollowerActorContextWithLeader() {
1258 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1259 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1260 followerConfig.setElectionTimeoutFactor(10000);
1261 followerActorContext.setConfigParams(followerConfig);
1262 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1263 return followerActorContext;
1267 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1268 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1270 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1272 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1274 Follower follower = new Follower(followerActorContext);
1275 followerActor.underlyingActor().setBehavior(follower);
1276 followerActorContext.setCurrentBehavior(follower);
1278 Map<String, String> peerAddresses = new HashMap<>();
1279 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1281 leaderActorContext.setPeerAddresses(peerAddresses);
1283 leaderActorContext.getReplicatedLog().removeFrom(0);
1286 leaderActorContext.setReplicatedLog(
1287 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1289 leaderActorContext.setCommitIndex(1);
1291 followerActorContext.getReplicatedLog().removeFrom(0);
1293 // follower too has the exact same log entries and has the same commit index
1294 followerActorContext.setReplicatedLog(
1295 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1297 followerActorContext.setCommitIndex(1);
1299 leader = new Leader(leaderActorContext);
1300 leaderActorContext.setCurrentBehavior(leader);
1302 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1304 assertEquals(-1, appendEntries.getLeaderCommit());
1305 assertEquals(0, appendEntries.getEntries().size());
1306 assertEquals(0, appendEntries.getPrevLogIndex());
1308 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1309 leaderActor, AppendEntriesReply.class);
1311 assertEquals(2, appendEntriesReply.getLogLastIndex());
1312 assertEquals(1, appendEntriesReply.getLogLastTerm());
1314 // follower returns its next index
1315 assertEquals(2, appendEntriesReply.getLogLastIndex());
1316 assertEquals(1, appendEntriesReply.getLogLastTerm());
1322 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1323 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1325 final MockRaftActorContext leaderActorContext = createActorContext();
1327 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1328 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1330 Follower follower = new Follower(followerActorContext);
1331 followerActor.underlyingActor().setBehavior(follower);
1332 followerActorContext.setCurrentBehavior(follower);
1334 Map<String, String> leaderPeerAddresses = new HashMap<>();
1335 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1337 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1339 leaderActorContext.getReplicatedLog().removeFrom(0);
1341 leaderActorContext.setReplicatedLog(
1342 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1344 leaderActorContext.setCommitIndex(1);
1346 followerActorContext.getReplicatedLog().removeFrom(0);
1348 followerActorContext.setReplicatedLog(
1349 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1351 // follower has the same log entries but its commit index > leaders commit index
1352 followerActorContext.setCommitIndex(2);
1354 leader = new Leader(leaderActorContext);
1356 // Initial heartbeat
1357 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1359 assertEquals(-1, appendEntries.getLeaderCommit());
1360 assertEquals(0, appendEntries.getEntries().size());
1361 assertEquals(0, appendEntries.getPrevLogIndex());
1363 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1364 leaderActor, AppendEntriesReply.class);
1366 assertEquals(2, appendEntriesReply.getLogLastIndex());
1367 assertEquals(1, appendEntriesReply.getLogLastTerm());
1369 leaderActor.underlyingActor().setBehavior(follower);
1370 leader.handleMessage(followerActor, appendEntriesReply);
1372 leaderActor.underlyingActor().clear();
1373 followerActor.underlyingActor().clear();
1375 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1376 TimeUnit.MILLISECONDS);
1378 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1380 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1382 assertEquals(2, appendEntries.getLeaderCommit());
1383 assertEquals(0, appendEntries.getEntries().size());
1384 assertEquals(2, appendEntries.getPrevLogIndex());
1386 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1388 assertEquals(2, appendEntriesReply.getLogLastIndex());
1389 assertEquals(1, appendEntriesReply.getLogLastTerm());
1391 assertEquals(2, followerActorContext.getCommitIndex());
1397 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1398 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1400 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1401 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1402 new FiniteDuration(1000, TimeUnit.SECONDS));
1404 leaderActorContext.setReplicatedLog(
1405 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1406 long leaderCommitIndex = 2;
1407 leaderActorContext.setCommitIndex(leaderCommitIndex);
1408 leaderActorContext.setLastApplied(leaderCommitIndex);
1410 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1411 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1413 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1415 followerActorContext.setReplicatedLog(
1416 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1417 followerActorContext.setCommitIndex(0);
1418 followerActorContext.setLastApplied(0);
1420 Follower follower = new Follower(followerActorContext);
1421 followerActor.underlyingActor().setBehavior(follower);
1423 leader = new Leader(leaderActorContext);
1425 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1426 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1427 AppendEntriesReply.class);
1429 MessageCollectorActor.clearMessages(followerActor);
1430 MessageCollectorActor.clearMessages(leaderActor);
1432 // Verify initial AppendEntries sent.
1433 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1434 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1435 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1437 leaderActor.underlyingActor().setBehavior(leader);
1439 leader.handleMessage(followerActor, appendEntriesReply);
1441 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1442 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1444 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1445 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1446 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1448 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1449 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1450 appendEntries.getEntries().get(0).getData());
1451 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1452 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1453 appendEntries.getEntries().get(1).getData());
1455 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1456 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1458 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1460 ApplyState applyState = applyStateList.get(0);
1461 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1462 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1463 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1464 applyState.getReplicatedLogEntry().getData());
1466 applyState = applyStateList.get(1);
1467 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1468 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1469 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1470 applyState.getReplicatedLogEntry().getData());
1472 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1473 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1477 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1478 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1480 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1481 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1482 new FiniteDuration(1000, TimeUnit.SECONDS));
1484 leaderActorContext.setReplicatedLog(
1485 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1486 long leaderCommitIndex = 1;
1487 leaderActorContext.setCommitIndex(leaderCommitIndex);
1488 leaderActorContext.setLastApplied(leaderCommitIndex);
1490 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1491 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1493 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1495 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1496 followerActorContext.setCommitIndex(-1);
1497 followerActorContext.setLastApplied(-1);
1499 Follower follower = new Follower(followerActorContext);
1500 followerActor.underlyingActor().setBehavior(follower);
1501 followerActorContext.setCurrentBehavior(follower);
1503 leader = new Leader(leaderActorContext);
1505 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1506 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1507 AppendEntriesReply.class);
1509 MessageCollectorActor.clearMessages(followerActor);
1510 MessageCollectorActor.clearMessages(leaderActor);
1512 // Verify initial AppendEntries sent with the leader's current commit index.
1513 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1514 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1515 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1517 leaderActor.underlyingActor().setBehavior(leader);
1518 leaderActorContext.setCurrentBehavior(leader);
1520 leader.handleMessage(followerActor, appendEntriesReply);
1522 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1523 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1525 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1526 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1527 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1529 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1530 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1531 appendEntries.getEntries().get(0).getData());
1532 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1533 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1534 appendEntries.getEntries().get(1).getData());
1536 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1537 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1539 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1541 ApplyState applyState = applyStateList.get(0);
1542 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1543 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1544 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1545 applyState.getReplicatedLogEntry().getData());
1547 applyState = applyStateList.get(1);
1548 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1549 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1550 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1551 applyState.getReplicatedLogEntry().getData());
1553 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1554 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1558 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1559 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1561 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1562 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1563 new FiniteDuration(1000, TimeUnit.SECONDS));
1565 leaderActorContext.setReplicatedLog(
1566 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1567 long leaderCommitIndex = 1;
1568 leaderActorContext.setCommitIndex(leaderCommitIndex);
1569 leaderActorContext.setLastApplied(leaderCommitIndex);
1571 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1572 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1574 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1576 followerActorContext.setReplicatedLog(
1577 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1578 followerActorContext.setCommitIndex(-1);
1579 followerActorContext.setLastApplied(-1);
1581 Follower follower = new Follower(followerActorContext);
1582 followerActor.underlyingActor().setBehavior(follower);
1583 followerActorContext.setCurrentBehavior(follower);
1585 leader = new Leader(leaderActorContext);
1587 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1588 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1589 AppendEntriesReply.class);
1591 MessageCollectorActor.clearMessages(followerActor);
1592 MessageCollectorActor.clearMessages(leaderActor);
1594 // Verify initial AppendEntries sent with the leader's current commit index.
1595 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1596 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1597 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1599 leaderActor.underlyingActor().setBehavior(leader);
1600 leaderActorContext.setCurrentBehavior(leader);
1602 leader.handleMessage(followerActor, appendEntriesReply);
1604 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1605 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1607 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1608 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1609 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1611 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1612 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1613 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1614 appendEntries.getEntries().get(0).getData());
1615 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1616 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1617 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1618 appendEntries.getEntries().get(1).getData());
1620 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1621 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1623 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1625 ApplyState applyState = applyStateList.get(0);
1626 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1627 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1628 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1629 applyState.getReplicatedLogEntry().getData());
1631 applyState = applyStateList.get(1);
1632 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1633 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1634 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1635 applyState.getReplicatedLogEntry().getData());
1637 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1638 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1639 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1643 public void testHandleAppendEntriesReplyWithNewerTerm() {
1644 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1646 MockRaftActorContext leaderActorContext = createActorContext();
1647 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1648 new FiniteDuration(10000, TimeUnit.SECONDS));
1650 leaderActorContext.setReplicatedLog(
1651 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1653 leader = new Leader(leaderActorContext);
1654 leaderActor.underlyingActor().setBehavior(leader);
1655 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1657 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1658 AppendEntriesReply.class);
1660 assertEquals(false, appendEntriesReply.isSuccess());
1661 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1663 MessageCollectorActor.clearMessages(leaderActor);
1667 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1668 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1670 MockRaftActorContext leaderActorContext = createActorContext();
1671 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1672 new FiniteDuration(10000, TimeUnit.SECONDS));
1674 leaderActorContext.setReplicatedLog(
1675 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1676 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1678 leader = new Leader(leaderActorContext);
1679 leaderActor.underlyingActor().setBehavior(leader);
1680 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1682 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1683 AppendEntriesReply.class);
1685 assertEquals(false, appendEntriesReply.isSuccess());
1686 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1688 MessageCollectorActor.clearMessages(leaderActor);
1692 public void testHandleAppendEntriesReplySuccess() throws Exception {
1693 logStart("testHandleAppendEntriesReplySuccess");
1695 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1697 leaderActorContext.setReplicatedLog(
1698 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1700 leaderActorContext.setCommitIndex(1);
1701 leaderActorContext.setLastApplied(1);
1702 leaderActorContext.getTermInformation().update(1, "leader");
1704 leader = new Leader(leaderActorContext);
1706 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1708 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1709 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1711 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1713 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1715 assertEquals(RaftState.Leader, raftActorBehavior.state());
1717 assertEquals(2, leaderActorContext.getCommitIndex());
1719 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1720 leaderActor, ApplyJournalEntries.class);
1722 assertEquals(2, leaderActorContext.getLastApplied());
1724 assertEquals(2, applyJournalEntries.getToIndex());
1726 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1729 assertEquals(1,applyStateList.size());
1731 ApplyState applyState = applyStateList.get(0);
1733 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1735 assertEquals(2, followerInfo.getMatchIndex());
1736 assertEquals(3, followerInfo.getNextIndex());
1737 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1738 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1742 public void testHandleAppendEntriesReplyUnknownFollower() {
1743 logStart("testHandleAppendEntriesReplyUnknownFollower");
1745 MockRaftActorContext leaderActorContext = createActorContext();
1747 leader = new Leader(leaderActorContext);
1749 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1751 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1753 assertEquals(RaftState.Leader, raftActorBehavior.state());
1757 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1758 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1760 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1761 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1762 new FiniteDuration(1000, TimeUnit.SECONDS));
1763 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1765 leaderActorContext.setReplicatedLog(
1766 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1767 long leaderCommitIndex = 3;
1768 leaderActorContext.setCommitIndex(leaderCommitIndex);
1769 leaderActorContext.setLastApplied(leaderCommitIndex);
1771 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1772 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1773 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1774 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1776 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1778 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1779 followerActorContext.setCommitIndex(-1);
1780 followerActorContext.setLastApplied(-1);
1782 Follower follower = new Follower(followerActorContext);
1783 followerActor.underlyingActor().setBehavior(follower);
1784 followerActorContext.setCurrentBehavior(follower);
1786 leader = new Leader(leaderActorContext);
1788 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1789 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1790 AppendEntriesReply.class);
1792 MessageCollectorActor.clearMessages(followerActor);
1793 MessageCollectorActor.clearMessages(leaderActor);
1795 // Verify initial AppendEntries sent with the leader's current commit index.
1796 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1797 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1798 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1800 leaderActor.underlyingActor().setBehavior(leader);
1801 leaderActorContext.setCurrentBehavior(leader);
1803 leader.handleMessage(followerActor, appendEntriesReply);
1805 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1806 AppendEntries.class, 2);
1807 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1809 appendEntries = appendEntriesList.get(0);
1810 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1811 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1812 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1814 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1815 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1816 appendEntries.getEntries().get(0).getData());
1817 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1818 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1819 appendEntries.getEntries().get(1).getData());
1821 appendEntries = appendEntriesList.get(1);
1822 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1823 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1824 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1826 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1827 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1828 appendEntries.getEntries().get(0).getData());
1829 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1830 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1831 appendEntries.getEntries().get(1).getData());
1833 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1834 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1836 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1838 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1839 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1843 public void testHandleRequestVoteReply() {
1844 logStart("testHandleRequestVoteReply");
1846 MockRaftActorContext leaderActorContext = createActorContext();
1848 leader = new Leader(leaderActorContext);
1850 // Should be a no-op.
1851 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1852 new RequestVoteReply(1, true));
1854 assertEquals(RaftState.Leader, raftActorBehavior.state());
1856 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1858 assertEquals(RaftState.Leader, raftActorBehavior.state());
1862 public void testIsolatedLeaderCheckNoFollowers() {
1863 logStart("testIsolatedLeaderCheckNoFollowers");
1865 MockRaftActorContext leaderActorContext = createActorContext();
1867 leader = new Leader(leaderActorContext);
1868 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1869 assertTrue(newBehavior instanceof Leader);
1873 public void testIsolatedLeaderCheckNoVotingFollowers() {
1874 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1876 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1877 Follower follower = new Follower(followerActorContext);
1878 followerActor.underlyingActor().setBehavior(follower);
1880 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1881 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1882 new FiniteDuration(1000, TimeUnit.SECONDS));
1883 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1885 leader = new Leader(leaderActorContext);
1886 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1887 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888 assertTrue("Expected Leader", newBehavior instanceof Leader);
1891 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1892 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1893 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1895 MockRaftActorContext leaderActorContext = createActorContext();
1897 Map<String, String> peerAddresses = new HashMap<>();
1898 peerAddresses.put("follower-1", followerActor1.path().toString());
1899 peerAddresses.put("follower-2", followerActor2.path().toString());
1901 leaderActorContext.setPeerAddresses(peerAddresses);
1902 leaderActorContext.setRaftPolicy(raftPolicy);
1904 leader = new Leader(leaderActorContext);
1906 leader.markFollowerActive("follower-1");
1907 leader.markFollowerActive("follower-2");
1908 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1909 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1911 // kill 1 follower and verify if that got killed
1912 final JavaTestKit probe = new JavaTestKit(getSystem());
1913 probe.watch(followerActor1);
1914 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1915 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1916 assertEquals(termMsg1.getActor(), followerActor1);
1918 leader.markFollowerInActive("follower-1");
1919 leader.markFollowerActive("follower-2");
1920 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921 assertTrue("Behavior not instance of Leader when majority of followers are active",
1922 newBehavior instanceof Leader);
1924 // kill 2nd follower and leader should change to Isolated leader
1925 followerActor2.tell(PoisonPill.getInstance(), null);
1926 probe.watch(followerActor2);
1927 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1928 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1929 assertEquals(termMsg2.getActor(), followerActor2);
1931 leader.markFollowerInActive("follower-2");
1932 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1936 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1937 logStart("testIsolatedLeaderCheckTwoFollowers");
1939 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1941 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1942 newBehavior instanceof IsolatedLeader);
1946 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1947 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1949 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1951 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1952 newBehavior instanceof Leader);
1956 public void testLaggingFollowerStarvation() throws Exception {
1957 logStart("testLaggingFollowerStarvation");
1959 String leaderActorId = actorFactory.generateActorId("leader");
1960 String follower1ActorId = actorFactory.generateActorId("follower");
1961 String follower2ActorId = actorFactory.generateActorId("follower");
1963 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1964 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1966 MockRaftActorContext leaderActorContext =
1967 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1969 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1970 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1971 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1973 leaderActorContext.setConfigParams(configParams);
1975 leaderActorContext.setReplicatedLog(
1976 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1978 Map<String, String> peerAddresses = new HashMap<>();
1979 peerAddresses.put(follower1ActorId,
1980 follower1Actor.path().toString());
1981 peerAddresses.put(follower2ActorId,
1982 follower2Actor.path().toString());
1984 leaderActorContext.setPeerAddresses(peerAddresses);
1985 leaderActorContext.getTermInformation().update(1, leaderActorId);
1987 leader = createBehavior(leaderActorContext);
1989 leaderActor.underlyingActor().setBehavior(leader);
1991 for (int i = 1; i < 6; i++) {
1992 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1993 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1994 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1995 assertTrue(newBehavior == leader);
1996 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1999 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2000 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2002 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2003 heartbeats.size() > 1);
2005 // Check if follower-2 got AppendEntries during this time and was not starved
2006 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2008 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2009 appendEntries.size() > 1);
2013 public void testReplicationConsensusWithNonVotingFollower() {
2014 logStart("testReplicationConsensusWithNonVotingFollower");
2016 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2017 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2018 new FiniteDuration(1000, TimeUnit.SECONDS));
2020 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2021 leaderActorContext.setCommitIndex(-1);
2022 leaderActorContext.setLastApplied(-1);
2024 String nonVotingFollowerId = "nonvoting-follower";
2025 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2026 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2028 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2029 VotingState.NON_VOTING);
2031 leader = new Leader(leaderActorContext);
2032 leaderActorContext.setCurrentBehavior(leader);
2034 // Ignore initial heartbeats
2035 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2036 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2038 MessageCollectorActor.clearMessages(followerActor);
2039 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2040 MessageCollectorActor.clearMessages(leaderActor);
2042 // Send a Replicate message and wait for AppendEntries.
2043 sendReplicate(leaderActorContext, 0);
2045 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2046 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2048 // Send reply only from the voting follower and verify consensus via ApplyState.
2049 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2051 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2053 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2055 MessageCollectorActor.clearMessages(followerActor);
2056 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2057 MessageCollectorActor.clearMessages(leaderActor);
2059 // Send another Replicate message
2060 sendReplicate(leaderActorContext, 1);
2062 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2063 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2064 AppendEntries.class);
2065 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2066 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2068 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2069 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2071 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2073 // Send reply from the voting follower and verify consensus.
2074 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2076 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2080 public void testTransferLeadershipWithFollowerInSync() {
2081 logStart("testTransferLeadershipWithFollowerInSync");
2083 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2084 leaderActorContext.setLastApplied(-1);
2085 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2086 new FiniteDuration(1000, TimeUnit.SECONDS));
2087 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2089 leader = new Leader(leaderActorContext);
2090 leaderActorContext.setCurrentBehavior(leader);
2092 // Initial heartbeat
2093 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2094 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2095 MessageCollectorActor.clearMessages(followerActor);
2097 sendReplicate(leaderActorContext, 0);
2098 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2100 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2101 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2102 MessageCollectorActor.clearMessages(followerActor);
2104 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2105 leader.transferLeadership(mockTransferCohort);
2107 verify(mockTransferCohort, never()).transferComplete();
2108 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2109 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2112 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2113 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2115 // Leader should force an election timeout
2116 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2118 verify(mockTransferCohort).transferComplete();
2122 public void testTransferLeadershipWithEmptyLog() {
2123 logStart("testTransferLeadershipWithEmptyLog");
2125 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127 new FiniteDuration(1000, TimeUnit.SECONDS));
2128 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2130 leader = new Leader(leaderActorContext);
2131 leaderActorContext.setCurrentBehavior(leader);
2133 // Initial heartbeat
2134 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2135 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2136 MessageCollectorActor.clearMessages(followerActor);
2138 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2140 leader.transferLeadership(mockTransferCohort);
2142 verify(mockTransferCohort, never()).transferComplete();
2143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2146 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149 // Leader should force an election timeout
2150 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152 verify(mockTransferCohort).transferComplete();
2156 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2157 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2159 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2163 leader = new Leader(leaderActorContext);
2164 leaderActorContext.setCurrentBehavior(leader);
2166 // Initial heartbeat
2167 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2168 MessageCollectorActor.clearMessages(followerActor);
2170 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2171 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2172 leader.transferLeadership(mockTransferCohort);
2174 verify(mockTransferCohort, never()).transferComplete();
2176 // Sync up the follower.
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2179 MessageCollectorActor.clearMessages(followerActor);
2181 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2182 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2183 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2184 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2185 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2187 // Leader should force an election timeout
2188 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2190 verify(mockTransferCohort).transferComplete();
2194 public void testTransferLeadershipWithFollowerSyncTimeout() {
2195 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2197 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2198 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2199 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2200 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2201 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2203 leader = new Leader(leaderActorContext);
2204 leaderActorContext.setCurrentBehavior(leader);
2206 // Initial heartbeat
2207 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2208 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2209 MessageCollectorActor.clearMessages(followerActor);
2211 sendReplicate(leaderActorContext, 0);
2212 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2214 MessageCollectorActor.clearMessages(followerActor);
2216 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2217 leader.transferLeadership(mockTransferCohort);
2219 verify(mockTransferCohort, never()).transferComplete();
2221 // Send heartbeats to time out the transfer.
2222 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2223 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2224 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2225 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2228 verify(mockTransferCohort).abortTransfer();
2229 verify(mockTransferCohort, never()).transferComplete();
2230 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2234 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2235 ActorRef actorRef, RaftRPC rpc) throws Exception {
2236 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2237 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2240 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2242 private final long electionTimeOutIntervalMillis;
2243 private final int snapshotChunkSize;
2245 MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2247 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2248 this.snapshotChunkSize = snapshotChunkSize;
2252 public FiniteDuration getElectionTimeOutInterval() {
2253 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2257 public int getSnapshotChunkSize() {
2258 return snapshotChunkSize;