2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.Optional;
37 import java.util.OptionalInt;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.messaging.MessageSlice;
44 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
47 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.cluster.raft.RaftVersions;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.VotingState;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
59 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.Payload;
67 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
72 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
73 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
75 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.opendaylight.yangtools.concepts.Identifier;
78 import scala.concurrent.duration.FiniteDuration;
80 public class LeaderTest extends AbstractLeaderTest<Leader> {
82 static final String FOLLOWER_ID = "follower";
83 public static final String LEADER_ID = "leader";
85 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
86 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
88 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
91 private Leader leader;
92 private final short payloadVersion = 5;
96 public void tearDown() {
105 public void testHandleMessageForUnknownMessage() {
106 logStart("testHandleMessageForUnknownMessage");
108 leader = new Leader(createActorContext());
110 // handle message should null when it receives an unknown message
111 assertNull(leader.handleMessage(followerActor, "foo"));
115 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
116 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
118 MockRaftActorContext actorContext = createActorContextWithFollower();
119 actorContext.setCommitIndex(-1);
120 actorContext.setPayloadVersion(payloadVersion);
123 actorContext.getTermInformation().update(term, "");
125 leader = new Leader(actorContext);
126 actorContext.setCurrentBehavior(leader);
128 // Leader should send an immediate heartbeat with no entries as follower is inactive.
129 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
130 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
131 assertEquals("getTerm", term, appendEntries.getTerm());
132 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
133 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
134 assertEquals("Entries size", 0, appendEntries.getEntries().size());
135 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
137 // The follower would normally reply - simulate that explicitly here.
138 leader.handleMessage(followerActor, new AppendEntriesReply(
139 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
140 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142 followerActor.underlyingActor().clear();
144 // Sleep for the heartbeat interval so AppendEntries is sent.
145 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
146 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
148 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
150 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
152 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
153 assertEquals("Entries size", 1, appendEntries.getEntries().size());
154 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
155 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
156 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
160 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
161 return sendReplicate(actorContext, 1, index);
164 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
166 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
169 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
170 final Payload payload) {
171 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172 actorContext.getReplicatedLog().append(newEntry);
173 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
177 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
180 MockRaftActorContext actorContext = createActorContextWithFollower();
183 actorContext.getTermInformation().update(term, "");
185 leader = new Leader(actorContext);
187 // Leader will send an immediate heartbeat - ignore it.
188 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 // The follower would normally reply - simulate that explicitly here.
191 long lastIndex = actorContext.getReplicatedLog().lastIndex();
192 leader.handleMessage(followerActor, new AppendEntriesReply(
193 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196 followerActor.underlyingActor().clear();
198 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
200 // State should not change
201 assertTrue(raftBehavior instanceof Leader);
203 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206 assertEquals("Entries size", 1, appendEntries.getEntries().size());
207 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
214 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
217 MockRaftActorContext actorContext = createActorContextWithFollower();
218 actorContext.setCommitIndex(-1);
219 actorContext.setLastApplied(-1);
221 // The raft context is initialized with a couple log entries. However the commitIndex
222 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223 // committed and applied. Now it regains leadership with a higher term (2).
224 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225 long newTerm = prevTerm + 1;
226 actorContext.getTermInformation().update(newTerm, "");
228 leader = new Leader(actorContext);
229 actorContext.setCurrentBehavior(leader);
231 // Leader will send an immediate heartbeat - ignore it.
232 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 // The follower replies with the leader's current last index and term, simulating that it is
235 // up to date with the leader.
236 long lastIndex = actorContext.getReplicatedLog().lastIndex();
237 leader.handleMessage(followerActor, new AppendEntriesReply(
238 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
240 // The commit index should not get updated even though consensus was reached. This is b/c the
241 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
242 // from previous terms by counting replicas".
243 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
245 followerActor.underlyingActor().clear();
247 // Now replicate a new entry with the new term 2.
248 long newIndex = lastIndex + 1;
249 sendReplicate(actorContext, newTerm, newIndex);
251 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254 assertEquals("Entries size", 1, appendEntries.getEntries().size());
255 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
259 // The follower replies with success. The leader should now update the commit index to the new index
260 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
261 // prior entries are committed indirectly".
262 leader.handleMessage(followerActor, new AppendEntriesReply(
263 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
265 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
269 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
272 MockRaftActorContext actorContext = createActorContextWithFollower();
273 actorContext.setRaftPolicy(createRaftPolicy(true, true));
276 actorContext.getTermInformation().update(term, "");
278 leader = new Leader(actorContext);
280 // Leader will send an immediate heartbeat - ignore it.
281 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 // The follower would normally reply - simulate that explicitly here.
284 long lastIndex = actorContext.getReplicatedLog().lastIndex();
285 leader.handleMessage(followerActor, new AppendEntriesReply(
286 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289 followerActor.underlyingActor().clear();
291 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
293 // State should not change
294 assertTrue(raftBehavior instanceof Leader);
296 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299 assertEquals("Entries size", 1, appendEntries.getEntries().size());
300 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
307 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
310 MockRaftActorContext actorContext = createActorContextWithFollower();
311 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313 public FiniteDuration getHeartBeatInterval() {
314 return FiniteDuration.apply(5, TimeUnit.SECONDS);
319 actorContext.getTermInformation().update(term, "");
321 leader = new Leader(actorContext);
323 // Leader will send an immediate heartbeat - ignore it.
324 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326 // The follower would normally reply - simulate that explicitly here.
327 long lastIndex = actorContext.getReplicatedLog().lastIndex();
328 leader.handleMessage(followerActor, new AppendEntriesReply(
329 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332 followerActor.underlyingActor().clear();
334 for (int i = 0; i < 5; i++) {
335 sendReplicate(actorContext, lastIndex + i + 1);
338 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339 // We expect only 1 message to be sent because of two reasons,
340 // - an append entries reply was not received
341 // - the heartbeat interval has not expired
342 // In this scenario if multiple messages are sent they would likely be duplicates
343 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
347 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
350 MockRaftActorContext actorContext = createActorContextWithFollower();
351 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
353 public FiniteDuration getHeartBeatInterval() {
354 return FiniteDuration.apply(5, TimeUnit.SECONDS);
359 actorContext.getTermInformation().update(term, "");
361 leader = new Leader(actorContext);
363 // Leader will send an immediate heartbeat - ignore it.
364 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
366 // The follower would normally reply - simulate that explicitly here.
367 long lastIndex = actorContext.getReplicatedLog().lastIndex();
368 leader.handleMessage(followerActor, new AppendEntriesReply(
369 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
372 followerActor.underlyingActor().clear();
374 for (int i = 0; i < 3; i++) {
375 sendReplicate(actorContext, lastIndex + i + 1);
376 leader.handleMessage(followerActor, new AppendEntriesReply(
377 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
380 // We are expecting six messages here -- a request to replicate and a consensus-reached message
381 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
382 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
383 for (int i = 0; i < 3; i++) {
384 assertRequestEntry(lastIndex, allMessages, i);
385 assertCommitEntry(lastIndex, allMessages, i);
388 // Now perform another commit, eliciting a request to persist
389 sendReplicate(actorContext, lastIndex + 3 + 1);
390 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
391 // This elicits another message for request to replicate
392 assertEquals("The number of request entries collected", 7, allMessages.size());
393 assertRequestEntry(lastIndex, allMessages, 3);
395 sendReplicate(actorContext, lastIndex + 4 + 1);
396 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
397 assertEquals("The number of request entries collected", 7, allMessages.size());
400 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
401 final int messageNr) {
402 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
403 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
404 assertEquals(List.of(), commitReq.getEntries());
407 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
408 final int messageNr) {
409 final AppendEntries req = allMessages.get(2 * messageNr);
410 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
412 final List<ReplicatedLogEntry> entries = req.getEntries();
413 assertEquals(1, entries.size());
414 assertEquals(messageNr + 2, entries.get(0).getIndex());
418 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
419 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
421 MockRaftActorContext actorContext = createActorContextWithFollower();
422 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
424 public FiniteDuration getHeartBeatInterval() {
425 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
430 actorContext.getTermInformation().update(term, "");
432 leader = new Leader(actorContext);
434 // Leader will send an immediate heartbeat - ignore it.
435 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
437 // The follower would normally reply - simulate that explicitly here.
438 long lastIndex = actorContext.getReplicatedLog().lastIndex();
439 leader.handleMessage(followerActor, new AppendEntriesReply(
440 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
441 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
443 followerActor.underlyingActor().clear();
445 sendReplicate(actorContext, lastIndex + 1);
447 // Wait slightly longer than heartbeat duration
448 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
450 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
452 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
453 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
455 assertEquals(1, allMessages.get(0).getEntries().size());
456 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
457 assertEquals(1, allMessages.get(1).getEntries().size());
458 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
463 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
464 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
466 MockRaftActorContext actorContext = createActorContextWithFollower();
467 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469 public FiniteDuration getHeartBeatInterval() {
470 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
475 actorContext.getTermInformation().update(term, "");
477 leader = new Leader(actorContext);
479 // Leader will send an immediate heartbeat - ignore it.
480 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482 // The follower would normally reply - simulate that explicitly here.
483 long lastIndex = actorContext.getReplicatedLog().lastIndex();
484 leader.handleMessage(followerActor, new AppendEntriesReply(
485 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488 followerActor.underlyingActor().clear();
490 for (int i = 0; i < 3; i++) {
491 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
495 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
500 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
501 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
503 MockRaftActorContext actorContext = createActorContextWithFollower();
504 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
506 public FiniteDuration getHeartBeatInterval() {
507 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
512 actorContext.getTermInformation().update(term, "");
514 leader = new Leader(actorContext);
516 // Leader will send an immediate heartbeat - ignore it.
517 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
519 // The follower would normally reply - simulate that explicitly here.
520 long lastIndex = actorContext.getReplicatedLog().lastIndex();
521 leader.handleMessage(followerActor, new AppendEntriesReply(
522 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
523 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
525 followerActor.underlyingActor().clear();
527 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
528 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
529 sendReplicate(actorContext, lastIndex + 1);
531 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
532 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
534 assertEquals(0, allMessages.get(0).getEntries().size());
535 assertEquals(1, allMessages.get(1).getEntries().size());
540 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
541 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
543 MockRaftActorContext actorContext = createActorContext();
545 leader = new Leader(actorContext);
547 actorContext.setLastApplied(0);
549 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
550 long term = actorContext.getTermInformation().getCurrentTerm();
551 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
552 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
554 actorContext.getReplicatedLog().append(newEntry);
556 final Identifier id = new MockIdentifier("state-id");
557 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
558 new Replicate(leaderActor, id, newEntry, true));
560 // State should not change
561 assertTrue(raftBehavior instanceof Leader);
563 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
565 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
566 // one since lastApplied state is 0.
567 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
568 leaderActor, ApplyState.class);
569 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
571 for (int i = 0; i <= newLogIndex - 1; i++) {
572 ApplyState applyState = applyStateList.get(i);
573 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
574 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
577 ApplyState last = applyStateList.get((int) newLogIndex - 1);
578 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
579 assertEquals("getIdentifier", id, last.getIdentifier());
583 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
584 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
586 final MockRaftActorContext actorContext = createActorContextWithFollower();
589 actorContext.getReplicatedLog().removeFrom(0);
591 final int commitIndex = 3;
592 final int snapshotIndex = 2;
593 final int snapshotTerm = 1;
595 // set the snapshot variables in replicatedlog
596 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
597 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
598 actorContext.setCommitIndex(commitIndex);
599 //set follower timeout to 2 mins, helps during debugging
600 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
602 leader = new Leader(actorContext);
604 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
605 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
607 //update follower timestamp
608 leader.markFollowerActive(FOLLOWER_ID);
610 ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
611 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
612 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
613 -1, null, null), ByteSource.wrap(bs.toByteArray())));
614 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
615 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
616 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
617 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
619 //send first chunk and no InstallSnapshotReply received yet
621 fts.incrementChunkIndex();
623 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
624 TimeUnit.MILLISECONDS);
626 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
628 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
630 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
632 //InstallSnapshotReply received
633 fts.markSendStatus(true);
635 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
637 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
639 assertEquals(commitIndex, is.getLastIncludedIndex());
643 public void testSendAppendEntriesSnapshotScenario() {
644 logStart("testSendAppendEntriesSnapshotScenario");
646 final MockRaftActorContext actorContext = createActorContextWithFollower();
648 Map<String, String> leadersSnapshot = new HashMap<>();
649 leadersSnapshot.put("1", "A");
650 leadersSnapshot.put("2", "B");
651 leadersSnapshot.put("3", "C");
654 actorContext.getReplicatedLog().removeFrom(0);
656 final int followersLastIndex = 2;
657 final int snapshotIndex = 3;
658 final int newEntryIndex = 4;
659 final int snapshotTerm = 1;
660 final int currentTerm = 2;
662 // set the snapshot variables in replicatedlog
663 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
664 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
665 actorContext.setCommitIndex(followersLastIndex);
667 leader = new Leader(actorContext);
669 // Leader will send an immediate heartbeat - ignore it.
670 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
673 SimpleReplicatedLogEntry entry =
674 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
675 new MockRaftActorContext.MockPayload("D"));
677 actorContext.getReplicatedLog().append(entry);
679 //update follower timestamp
680 leader.markFollowerActive(FOLLOWER_ID);
682 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
683 RaftActorBehavior raftBehavior = leader.handleMessage(
684 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
686 assertTrue(raftBehavior instanceof Leader);
688 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
692 public void testInitiateInstallSnapshot() {
693 logStart("testInitiateInstallSnapshot");
695 MockRaftActorContext actorContext = createActorContextWithFollower();
698 actorContext.getReplicatedLog().removeFrom(0);
700 final int followersLastIndex = 2;
701 final int snapshotIndex = 3;
702 final int newEntryIndex = 4;
703 final int snapshotTerm = 1;
704 final int currentTerm = 2;
706 // set the snapshot variables in replicatedlog
707 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
708 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
709 actorContext.setLastApplied(3);
710 actorContext.setCommitIndex(followersLastIndex);
712 leader = new Leader(actorContext);
714 // Leader will send an immediate heartbeat - ignore it.
715 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
717 // set the snapshot as absent and check if capture-snapshot is invoked.
718 leader.setSnapshotHolder(null);
721 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
722 new MockRaftActorContext.MockPayload("D"));
724 actorContext.getReplicatedLog().append(entry);
726 //update follower timestamp
727 leader.markFollowerActive(FOLLOWER_ID);
729 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
731 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
733 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
735 assertEquals(3, cs.getLastAppliedIndex());
736 assertEquals(1, cs.getLastAppliedTerm());
737 assertEquals(4, cs.getLastIndex());
738 assertEquals(2, cs.getLastTerm());
740 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
741 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
743 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
747 public void testInitiateForceInstallSnapshot() throws Exception {
748 logStart("testInitiateForceInstallSnapshot");
750 MockRaftActorContext actorContext = createActorContextWithFollower();
752 final int followersLastIndex = 2;
753 final int snapshotIndex = -1;
754 final int newEntryIndex = 4;
755 final int snapshotTerm = -1;
756 final int currentTerm = 2;
758 // set the snapshot variables in replicatedlog
759 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
760 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
761 actorContext.setLastApplied(3);
762 actorContext.setCommitIndex(followersLastIndex);
764 actorContext.getReplicatedLog().removeFrom(0);
766 AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
767 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
769 leader = new Leader(actorContext);
770 actorContext.setCurrentBehavior(leader);
772 // Leader will send an immediate heartbeat - ignore it.
773 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
775 // set the snapshot as absent and check if capture-snapshot is invoked.
776 leader.setSnapshotHolder(null);
778 for (int i = 0; i < 4; i++) {
779 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
780 new MockRaftActorContext.MockPayload("X" + i)));
784 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
785 new MockRaftActorContext.MockPayload("D"));
787 actorContext.getReplicatedLog().append(entry);
789 //update follower timestamp
790 leader.markFollowerActive(FOLLOWER_ID);
792 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
793 // installed with a SendInstallSnapshot
794 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
795 RaftVersions.CURRENT_VERSION));
797 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
799 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
800 assertEquals(3, cs.getLastAppliedIndex());
801 assertEquals(1, cs.getLastAppliedTerm());
802 assertEquals(4, cs.getLastIndex());
803 assertEquals(2, cs.getLastTerm());
805 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
806 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
808 MessageCollectorActor.clearMessages(followerActor);
810 // Sending Replicate message should not initiate another capture since the first is in progress.
811 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
812 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
814 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
815 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
816 RaftVersions.CURRENT_VERSION));
817 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
819 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
820 final byte[] bytes = new byte[]{1, 2, 3};
821 installSnapshotStream.get().orElseThrow().write(bytes);
822 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
823 Runtime.getRuntime().totalMemory());
824 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
827 MessageCollectorActor.clearMessages(followerActor);
828 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
829 RaftVersions.CURRENT_VERSION));
830 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
835 public void testInstallSnapshot() {
836 logStart("testInstallSnapshot");
838 final MockRaftActorContext actorContext = createActorContextWithFollower();
840 Map<String, String> leadersSnapshot = new HashMap<>();
841 leadersSnapshot.put("1", "A");
842 leadersSnapshot.put("2", "B");
843 leadersSnapshot.put("3", "C");
846 actorContext.getReplicatedLog().removeFrom(0);
848 final int lastAppliedIndex = 3;
849 final int snapshotIndex = 2;
850 final int snapshotTerm = 1;
851 final int currentTerm = 2;
853 // set the snapshot variables in replicatedlog
854 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
857 actorContext.setCommitIndex(lastAppliedIndex);
858 actorContext.setLastApplied(lastAppliedIndex);
860 leader = new Leader(actorContext);
862 // Initial heartbeat.
863 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
865 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
866 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
868 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
869 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
870 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
872 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
873 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
875 assertTrue(raftBehavior instanceof Leader);
877 // check if installsnapshot gets called with the correct values.
879 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
880 InstallSnapshot.class);
882 assertNotNull(installSnapshot.getData());
883 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
886 assertEquals(currentTerm, installSnapshot.getTerm());
890 public void testForceInstallSnapshot() {
891 logStart("testForceInstallSnapshot");
893 final MockRaftActorContext actorContext = createActorContextWithFollower();
895 Map<String, String> leadersSnapshot = new HashMap<>();
896 leadersSnapshot.put("1", "A");
897 leadersSnapshot.put("2", "B");
898 leadersSnapshot.put("3", "C");
900 final int lastAppliedIndex = 3;
901 final int snapshotIndex = -1;
902 final int snapshotTerm = -1;
903 final int currentTerm = 2;
905 // set the snapshot variables in replicatedlog
906 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
907 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
908 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
909 actorContext.setCommitIndex(lastAppliedIndex);
910 actorContext.setLastApplied(lastAppliedIndex);
912 leader = new Leader(actorContext);
914 // Initial heartbeat.
915 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
917 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
918 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
920 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
921 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
922 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
924 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
925 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
927 assertTrue(raftBehavior instanceof Leader);
929 // check if installsnapshot gets called with the correct values.
931 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
932 InstallSnapshot.class);
934 assertNotNull(installSnapshot.getData());
935 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
936 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
938 assertEquals(currentTerm, installSnapshot.getTerm());
942 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
943 logStart("testHandleInstallSnapshotReplyLastChunk");
945 MockRaftActorContext actorContext = createActorContextWithFollower();
947 final int commitIndex = 3;
948 final int snapshotIndex = 2;
949 final int snapshotTerm = 1;
950 final int currentTerm = 2;
952 actorContext.setCommitIndex(commitIndex);
954 leader = new Leader(actorContext);
955 actorContext.setCurrentBehavior(leader);
957 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
958 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
960 // Ignore initial heartbeat.
961 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
963 Map<String, String> leadersSnapshot = new HashMap<>();
964 leadersSnapshot.put("1", "A");
965 leadersSnapshot.put("2", "B");
966 leadersSnapshot.put("3", "C");
968 // set the snapshot variables in replicatedlog
970 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
971 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
972 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
974 ByteString bs = toByteString(leadersSnapshot);
975 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
976 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
977 -1, null, null), ByteSource.wrap(bs.toByteArray())));
978 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
979 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
980 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
981 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
982 while (!fts.isLastChunk(fts.getChunkIndex())) {
984 fts.incrementChunkIndex();
988 actorContext.getReplicatedLog().removeFrom(0);
990 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
991 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
993 assertTrue(raftBehavior instanceof Leader);
995 assertEquals(1, leader.followerLogSize());
996 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
998 assertNull(fli.getInstallSnapshotState());
999 assertEquals(commitIndex, fli.getMatchIndex());
1000 assertEquals(commitIndex + 1, fli.getNextIndex());
1001 assertFalse(leader.hasSnapshot());
1005 public void testSendSnapshotfromInstallSnapshotReply() {
1006 logStart("testSendSnapshotfromInstallSnapshotReply");
1008 MockRaftActorContext actorContext = createActorContextWithFollower();
1010 final int commitIndex = 3;
1011 final int snapshotIndex = 2;
1012 final int snapshotTerm = 1;
1013 final int currentTerm = 2;
1015 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1017 public int getSnapshotChunkSize() {
1021 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1022 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1024 actorContext.setConfigParams(configParams);
1025 actorContext.setCommitIndex(commitIndex);
1027 leader = new Leader(actorContext);
1028 actorContext.setCurrentBehavior(leader);
1030 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1031 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1033 Map<String, String> leadersSnapshot = new HashMap<>();
1034 leadersSnapshot.put("1", "A");
1035 leadersSnapshot.put("2", "B");
1036 leadersSnapshot.put("3", "C");
1038 // set the snapshot variables in replicatedlog
1039 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1040 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1041 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1043 ByteString bs = toByteString(leadersSnapshot);
1044 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1045 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1047 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1049 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1050 InstallSnapshot.class);
1052 assertEquals(1, installSnapshot.getChunkIndex());
1053 assertEquals(3, installSnapshot.getTotalChunks());
1055 followerActor.underlyingActor().clear();
1056 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1057 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1059 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1061 assertEquals(2, installSnapshot.getChunkIndex());
1062 assertEquals(3, installSnapshot.getTotalChunks());
1064 followerActor.underlyingActor().clear();
1065 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1066 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1068 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1070 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1071 followerActor.underlyingActor().clear();
1072 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1075 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1077 assertNull(installSnapshot);
1082 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1083 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1085 MockRaftActorContext actorContext = createActorContextWithFollower();
1087 final int commitIndex = 3;
1088 final int snapshotIndex = 2;
1089 final int snapshotTerm = 1;
1090 final int currentTerm = 2;
1092 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1094 public int getSnapshotChunkSize() {
1099 actorContext.setCommitIndex(commitIndex);
1101 leader = new Leader(actorContext);
1103 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1104 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1106 Map<String, String> leadersSnapshot = new HashMap<>();
1107 leadersSnapshot.put("1", "A");
1108 leadersSnapshot.put("2", "B");
1109 leadersSnapshot.put("3", "C");
1111 // set the snapshot variables in replicatedlog
1112 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1113 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1114 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1116 ByteString bs = toByteString(leadersSnapshot);
1117 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1118 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1120 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1121 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1123 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1124 InstallSnapshot.class);
1126 assertEquals(1, installSnapshot.getChunkIndex());
1127 assertEquals(3, installSnapshot.getTotalChunks());
1129 followerActor.underlyingActor().clear();
1131 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1132 FOLLOWER_ID, -1, false));
1134 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1135 TimeUnit.MILLISECONDS);
1137 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1139 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1141 assertEquals(1, installSnapshot.getChunkIndex());
1142 assertEquals(3, installSnapshot.getTotalChunks());
1146 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1147 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1149 MockRaftActorContext actorContext = createActorContextWithFollower();
1151 final int commitIndex = 3;
1152 final int snapshotIndex = 2;
1153 final int snapshotTerm = 1;
1154 final int currentTerm = 2;
1156 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1158 public int getSnapshotChunkSize() {
1163 actorContext.setCommitIndex(commitIndex);
1165 leader = new Leader(actorContext);
1167 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1168 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1170 Map<String, String> leadersSnapshot = new HashMap<>();
1171 leadersSnapshot.put("1", "A");
1172 leadersSnapshot.put("2", "B");
1173 leadersSnapshot.put("3", "C");
1175 // set the snapshot variables in replicatedlog
1176 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1177 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1178 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1180 ByteString bs = toByteString(leadersSnapshot);
1181 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1182 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1184 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1186 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1187 InstallSnapshot.class);
1189 assertEquals(1, installSnapshot.getChunkIndex());
1190 assertEquals(3, installSnapshot.getTotalChunks());
1191 assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
1192 installSnapshot.getLastChunkHashCode());
1194 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1196 followerActor.underlyingActor().clear();
1198 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1199 FOLLOWER_ID, 1, true));
1201 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1203 assertEquals(2, installSnapshot.getChunkIndex());
1204 assertEquals(3, installSnapshot.getTotalChunks());
1205 assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
1209 public void testLeaderInstallSnapshotState() throws IOException {
1210 logStart("testLeaderInstallSnapshotState");
1212 Map<String, String> leadersSnapshot = new HashMap<>();
1213 leadersSnapshot.put("1", "A");
1214 leadersSnapshot.put("2", "B");
1215 leadersSnapshot.put("3", "C");
1217 ByteString bs = toByteString(leadersSnapshot);
1218 byte[] barray = bs.toByteArray();
1220 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1221 fts.setSnapshotBytes(ByteSource.wrap(barray));
1223 assertEquals(bs.size(), barray.length);
1226 for (int i = 0; i < barray.length; i = i + 50) {
1227 int length = i + 50;
1230 if (i + 50 > barray.length) {
1231 length = barray.length;
1234 byte[] chunk = fts.getNextChunk();
1235 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1236 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1238 fts.markSendStatus(true);
1239 if (!fts.isLastChunk(chunkIndex)) {
1240 fts.incrementChunkIndex();
1244 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1249 protected Leader createBehavior(final RaftActorContext actorContext) {
1250 return new Leader(actorContext);
1254 protected MockRaftActorContext createActorContext() {
1255 return createActorContext(leaderActor);
1259 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1260 return createActorContext(LEADER_ID, actorRef);
1263 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1264 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1265 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1266 configParams.setElectionTimeoutFactor(100000);
1267 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1268 context.setConfigParams(configParams);
1269 context.setPayloadVersion(payloadVersion);
1273 private MockRaftActorContext createActorContextWithFollower() {
1274 MockRaftActorContext actorContext = createActorContext();
1275 actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1276 return actorContext;
1279 private MockRaftActorContext createFollowerActorContextWithLeader() {
1280 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1281 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1282 followerConfig.setElectionTimeoutFactor(10000);
1283 followerActorContext.setConfigParams(followerConfig);
1284 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1285 return followerActorContext;
1289 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1290 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1292 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1294 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1296 Follower follower = new Follower(followerActorContext);
1297 followerActor.underlyingActor().setBehavior(follower);
1298 followerActorContext.setCurrentBehavior(follower);
1300 Map<String, String> peerAddresses = new HashMap<>();
1301 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1303 leaderActorContext.setPeerAddresses(peerAddresses);
1305 leaderActorContext.getReplicatedLog().removeFrom(0);
1308 leaderActorContext.setReplicatedLog(
1309 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1311 leaderActorContext.setCommitIndex(1);
1313 followerActorContext.getReplicatedLog().removeFrom(0);
1315 // follower too has the exact same log entries and has the same commit index
1316 followerActorContext.setReplicatedLog(
1317 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1319 followerActorContext.setCommitIndex(1);
1321 leader = new Leader(leaderActorContext);
1322 leaderActorContext.setCurrentBehavior(leader);
1324 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1326 assertEquals(-1, appendEntries.getLeaderCommit());
1327 assertEquals(0, appendEntries.getEntries().size());
1328 assertEquals(0, appendEntries.getPrevLogIndex());
1330 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1331 leaderActor, AppendEntriesReply.class);
1333 assertEquals(2, appendEntriesReply.getLogLastIndex());
1334 assertEquals(1, appendEntriesReply.getLogLastTerm());
1336 // follower returns its next index
1337 assertEquals(2, appendEntriesReply.getLogLastIndex());
1338 assertEquals(1, appendEntriesReply.getLogLastTerm());
1344 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1345 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1347 final MockRaftActorContext leaderActorContext = createActorContext();
1349 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1350 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1352 Follower follower = new Follower(followerActorContext);
1353 followerActor.underlyingActor().setBehavior(follower);
1354 followerActorContext.setCurrentBehavior(follower);
1356 Map<String, String> leaderPeerAddresses = new HashMap<>();
1357 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1359 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1361 leaderActorContext.getReplicatedLog().removeFrom(0);
1363 leaderActorContext.setReplicatedLog(
1364 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1366 leaderActorContext.setCommitIndex(1);
1368 followerActorContext.getReplicatedLog().removeFrom(0);
1370 followerActorContext.setReplicatedLog(
1371 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1373 // follower has the same log entries but its commit index > leaders commit index
1374 followerActorContext.setCommitIndex(2);
1376 leader = new Leader(leaderActorContext);
1378 // Initial heartbeat
1379 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1381 assertEquals(-1, appendEntries.getLeaderCommit());
1382 assertEquals(0, appendEntries.getEntries().size());
1383 assertEquals(0, appendEntries.getPrevLogIndex());
1385 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1386 leaderActor, AppendEntriesReply.class);
1388 assertEquals(2, appendEntriesReply.getLogLastIndex());
1389 assertEquals(1, appendEntriesReply.getLogLastTerm());
1391 leaderActor.underlyingActor().setBehavior(follower);
1392 leader.handleMessage(followerActor, appendEntriesReply);
1394 leaderActor.underlyingActor().clear();
1395 followerActor.underlyingActor().clear();
1397 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1398 TimeUnit.MILLISECONDS);
1400 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1402 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1404 assertEquals(2, appendEntries.getLeaderCommit());
1405 assertEquals(0, appendEntries.getEntries().size());
1406 assertEquals(2, appendEntries.getPrevLogIndex());
1408 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1410 assertEquals(2, appendEntriesReply.getLogLastIndex());
1411 assertEquals(1, appendEntriesReply.getLogLastTerm());
1413 assertEquals(2, followerActorContext.getCommitIndex());
1419 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1420 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1422 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1423 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1424 new FiniteDuration(1000, TimeUnit.SECONDS));
1426 leaderActorContext.setReplicatedLog(
1427 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1428 long leaderCommitIndex = 2;
1429 leaderActorContext.setCommitIndex(leaderCommitIndex);
1430 leaderActorContext.setLastApplied(leaderCommitIndex);
1432 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1433 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1435 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1437 followerActorContext.setReplicatedLog(
1438 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1439 followerActorContext.setCommitIndex(0);
1440 followerActorContext.setLastApplied(0);
1442 Follower follower = new Follower(followerActorContext);
1443 followerActor.underlyingActor().setBehavior(follower);
1445 leader = new Leader(leaderActorContext);
1447 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1448 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1449 AppendEntriesReply.class);
1451 MessageCollectorActor.clearMessages(followerActor);
1452 MessageCollectorActor.clearMessages(leaderActor);
1454 // Verify initial AppendEntries sent.
1455 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1456 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1457 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1459 leaderActor.underlyingActor().setBehavior(leader);
1461 leader.handleMessage(followerActor, appendEntriesReply);
1463 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1464 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1466 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1467 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1468 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1470 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1471 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1472 appendEntries.getEntries().get(0).getData());
1473 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1474 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1475 appendEntries.getEntries().get(1).getData());
1477 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1478 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1480 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1482 ApplyState applyState = applyStateList.get(0);
1483 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1484 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1485 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1486 applyState.getReplicatedLogEntry().getData());
1488 applyState = applyStateList.get(1);
1489 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1490 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1491 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1492 applyState.getReplicatedLogEntry().getData());
1494 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1495 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1499 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1500 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1502 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1503 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1504 new FiniteDuration(1000, TimeUnit.SECONDS));
1506 leaderActorContext.setReplicatedLog(
1507 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1508 long leaderCommitIndex = 1;
1509 leaderActorContext.setCommitIndex(leaderCommitIndex);
1510 leaderActorContext.setLastApplied(leaderCommitIndex);
1512 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1513 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1515 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1517 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1518 followerActorContext.setCommitIndex(-1);
1519 followerActorContext.setLastApplied(-1);
1521 Follower follower = new Follower(followerActorContext);
1522 followerActor.underlyingActor().setBehavior(follower);
1523 followerActorContext.setCurrentBehavior(follower);
1525 leader = new Leader(leaderActorContext);
1527 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1528 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1529 AppendEntriesReply.class);
1531 MessageCollectorActor.clearMessages(followerActor);
1532 MessageCollectorActor.clearMessages(leaderActor);
1534 // Verify initial AppendEntries sent with the leader's current commit index.
1535 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1536 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1537 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1539 leaderActor.underlyingActor().setBehavior(leader);
1540 leaderActorContext.setCurrentBehavior(leader);
1542 leader.handleMessage(followerActor, appendEntriesReply);
1544 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1545 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1547 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1548 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1549 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1551 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1552 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1553 appendEntries.getEntries().get(0).getData());
1554 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1555 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1556 appendEntries.getEntries().get(1).getData());
1558 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1559 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1561 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1563 ApplyState applyState = applyStateList.get(0);
1564 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1565 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1566 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1567 applyState.getReplicatedLogEntry().getData());
1569 applyState = applyStateList.get(1);
1570 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1571 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1572 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1573 applyState.getReplicatedLogEntry().getData());
1575 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1576 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1580 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1581 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1583 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1584 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1585 new FiniteDuration(1000, TimeUnit.SECONDS));
1587 leaderActorContext.setReplicatedLog(
1588 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1589 long leaderCommitIndex = 1;
1590 leaderActorContext.setCommitIndex(leaderCommitIndex);
1591 leaderActorContext.setLastApplied(leaderCommitIndex);
1593 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1594 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1596 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1598 followerActorContext.setReplicatedLog(
1599 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1600 followerActorContext.setCommitIndex(-1);
1601 followerActorContext.setLastApplied(-1);
1603 Follower follower = new Follower(followerActorContext);
1604 followerActor.underlyingActor().setBehavior(follower);
1605 followerActorContext.setCurrentBehavior(follower);
1607 leader = new Leader(leaderActorContext);
1609 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1610 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1611 AppendEntriesReply.class);
1613 MessageCollectorActor.clearMessages(followerActor);
1614 MessageCollectorActor.clearMessages(leaderActor);
1616 // Verify initial AppendEntries sent with the leader's current commit index.
1617 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1618 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1619 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1621 leaderActor.underlyingActor().setBehavior(leader);
1622 leaderActorContext.setCurrentBehavior(leader);
1624 leader.handleMessage(followerActor, appendEntriesReply);
1626 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1627 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1629 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1630 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1631 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1633 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1634 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1635 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1636 appendEntries.getEntries().get(0).getData());
1637 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1638 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1639 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1640 appendEntries.getEntries().get(1).getData());
1642 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1643 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1645 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1647 ApplyState applyState = applyStateList.get(0);
1648 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1649 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1650 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1651 applyState.getReplicatedLogEntry().getData());
1653 applyState = applyStateList.get(1);
1654 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1655 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1656 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1657 applyState.getReplicatedLogEntry().getData());
1659 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1660 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1661 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1665 public void testHandleAppendEntriesReplyWithNewerTerm() {
1666 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1668 MockRaftActorContext leaderActorContext = createActorContext();
1669 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1670 new FiniteDuration(10000, TimeUnit.SECONDS));
1672 leaderActorContext.setReplicatedLog(
1673 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1675 leader = new Leader(leaderActorContext);
1676 leaderActor.underlyingActor().setBehavior(leader);
1677 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1679 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1680 AppendEntriesReply.class);
1682 assertEquals(false, appendEntriesReply.isSuccess());
1683 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1685 MessageCollectorActor.clearMessages(leaderActor);
1689 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1690 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1692 MockRaftActorContext leaderActorContext = createActorContext();
1693 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1694 new FiniteDuration(10000, TimeUnit.SECONDS));
1696 leaderActorContext.setReplicatedLog(
1697 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1698 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1700 leader = new Leader(leaderActorContext);
1701 leaderActor.underlyingActor().setBehavior(leader);
1702 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1704 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1705 AppendEntriesReply.class);
1707 assertEquals(false, appendEntriesReply.isSuccess());
1708 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1710 MessageCollectorActor.clearMessages(leaderActor);
1714 public void testHandleAppendEntriesReplySuccess() {
1715 logStart("testHandleAppendEntriesReplySuccess");
1717 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1719 leaderActorContext.setReplicatedLog(
1720 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1722 leaderActorContext.setCommitIndex(1);
1723 leaderActorContext.setLastApplied(1);
1724 leaderActorContext.getTermInformation().update(1, "leader");
1726 leader = new Leader(leaderActorContext);
1728 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1730 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1731 assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
1733 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1735 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1737 assertEquals(RaftState.Leader, raftActorBehavior.state());
1739 assertEquals(2, leaderActorContext.getCommitIndex());
1741 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1742 leaderActor, ApplyJournalEntries.class);
1744 assertEquals(2, leaderActorContext.getLastApplied());
1746 assertEquals(2, applyJournalEntries.getToIndex());
1748 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1751 assertEquals(1,applyStateList.size());
1753 ApplyState applyState = applyStateList.get(0);
1755 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1757 assertEquals(2, followerInfo.getMatchIndex());
1758 assertEquals(3, followerInfo.getNextIndex());
1759 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1760 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1764 public void testHandleAppendEntriesReplyUnknownFollower() {
1765 logStart("testHandleAppendEntriesReplyUnknownFollower");
1767 MockRaftActorContext leaderActorContext = createActorContext();
1769 leader = new Leader(leaderActorContext);
1771 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1773 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1775 assertEquals(RaftState.Leader, raftActorBehavior.state());
1779 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1780 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1782 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1783 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1784 new FiniteDuration(1000, TimeUnit.SECONDS));
1785 // Note: the size here depends on estimate
1786 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1788 leaderActorContext.setReplicatedLog(
1789 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1790 long leaderCommitIndex = 3;
1791 leaderActorContext.setCommitIndex(leaderCommitIndex);
1792 leaderActorContext.setLastApplied(leaderCommitIndex);
1794 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1795 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1796 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1797 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1799 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1801 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1802 followerActorContext.setCommitIndex(-1);
1803 followerActorContext.setLastApplied(-1);
1805 Follower follower = new Follower(followerActorContext);
1806 followerActor.underlyingActor().setBehavior(follower);
1807 followerActorContext.setCurrentBehavior(follower);
1809 leader = new Leader(leaderActorContext);
1811 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1812 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1813 AppendEntriesReply.class);
1815 MessageCollectorActor.clearMessages(followerActor);
1816 MessageCollectorActor.clearMessages(leaderActor);
1818 // Verify initial AppendEntries sent with the leader's current commit index.
1819 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1820 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1821 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1823 leaderActor.underlyingActor().setBehavior(leader);
1824 leaderActorContext.setCurrentBehavior(leader);
1826 leader.handleMessage(followerActor, appendEntriesReply);
1828 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1829 AppendEntries.class, 2);
1830 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1832 appendEntries = appendEntriesList.get(0);
1833 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1834 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1835 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1837 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1838 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1839 appendEntries.getEntries().get(0).getData());
1840 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1841 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1842 appendEntries.getEntries().get(1).getData());
1844 appendEntries = appendEntriesList.get(1);
1845 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1846 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1847 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1849 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1850 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1851 appendEntries.getEntries().get(0).getData());
1852 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1853 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1854 appendEntries.getEntries().get(1).getData());
1856 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1857 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1859 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1861 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1862 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1866 public void testHandleRequestVoteReply() {
1867 logStart("testHandleRequestVoteReply");
1869 MockRaftActorContext leaderActorContext = createActorContext();
1871 leader = new Leader(leaderActorContext);
1873 // Should be a no-op.
1874 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1875 new RequestVoteReply(1, true));
1877 assertEquals(RaftState.Leader, raftActorBehavior.state());
1879 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1881 assertEquals(RaftState.Leader, raftActorBehavior.state());
1885 public void testIsolatedLeaderCheckNoFollowers() {
1886 logStart("testIsolatedLeaderCheckNoFollowers");
1888 MockRaftActorContext leaderActorContext = createActorContext();
1890 leader = new Leader(leaderActorContext);
1891 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1892 assertTrue(newBehavior instanceof Leader);
1896 public void testIsolatedLeaderCheckNoVotingFollowers() {
1897 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1899 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1900 Follower follower = new Follower(followerActorContext);
1901 followerActor.underlyingActor().setBehavior(follower);
1903 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1904 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1905 new FiniteDuration(1000, TimeUnit.SECONDS));
1906 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1908 leader = new Leader(leaderActorContext);
1909 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1910 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1911 assertTrue("Expected Leader", newBehavior instanceof Leader);
1914 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1915 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1916 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1918 MockRaftActorContext leaderActorContext = createActorContext();
1920 Map<String, String> peerAddresses = new HashMap<>();
1921 peerAddresses.put("follower-1", followerActor1.path().toString());
1922 peerAddresses.put("follower-2", followerActor2.path().toString());
1924 leaderActorContext.setPeerAddresses(peerAddresses);
1925 leaderActorContext.setRaftPolicy(raftPolicy);
1927 leader = new Leader(leaderActorContext);
1929 leader.markFollowerActive("follower-1");
1930 leader.markFollowerActive("follower-2");
1931 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1932 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1934 // kill 1 follower and verify if that got killed
1935 final TestKit probe = new TestKit(getSystem());
1936 probe.watch(followerActor1);
1937 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1938 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1939 assertEquals(termMsg1.getActor(), followerActor1);
1941 leader.markFollowerInActive("follower-1");
1942 leader.markFollowerActive("follower-2");
1943 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1944 assertTrue("Behavior not instance of Leader when majority of followers are active",
1945 newBehavior instanceof Leader);
1947 // kill 2nd follower and leader should change to Isolated leader
1948 followerActor2.tell(PoisonPill.getInstance(), null);
1949 probe.watch(followerActor2);
1950 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1951 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1952 assertEquals(termMsg2.getActor(), followerActor2);
1954 leader.markFollowerInActive("follower-2");
1955 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1959 public void testIsolatedLeaderCheckTwoFollowers() {
1960 logStart("testIsolatedLeaderCheckTwoFollowers");
1962 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1964 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1965 newBehavior instanceof IsolatedLeader);
1969 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1970 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1972 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1974 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1975 newBehavior instanceof Leader);
1979 public void testLaggingFollowerStarvation() {
1980 logStart("testLaggingFollowerStarvation");
1982 String leaderActorId = actorFactory.generateActorId("leader");
1983 String follower1ActorId = actorFactory.generateActorId("follower");
1984 String follower2ActorId = actorFactory.generateActorId("follower");
1986 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1987 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1989 MockRaftActorContext leaderActorContext =
1990 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1992 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1993 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1994 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1996 leaderActorContext.setConfigParams(configParams);
1998 leaderActorContext.setReplicatedLog(
1999 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2001 Map<String, String> peerAddresses = new HashMap<>();
2002 peerAddresses.put(follower1ActorId,
2003 follower1Actor.path().toString());
2004 peerAddresses.put(follower2ActorId,
2005 follower2Actor.path().toString());
2007 leaderActorContext.setPeerAddresses(peerAddresses);
2008 leaderActorContext.getTermInformation().update(1, leaderActorId);
2010 leader = createBehavior(leaderActorContext);
2012 leaderActor.underlyingActor().setBehavior(leader);
2014 for (int i = 1; i < 6; i++) {
2015 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2016 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2017 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2018 assertTrue(newBehavior == leader);
2019 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2022 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2023 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2025 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2026 heartbeats.size() > 1);
2028 // Check if follower-2 got AppendEntries during this time and was not starved
2029 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2031 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2032 appendEntries.size() > 1);
2036 public void testReplicationConsensusWithNonVotingFollower() {
2037 logStart("testReplicationConsensusWithNonVotingFollower");
2039 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2040 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2041 new FiniteDuration(1000, TimeUnit.SECONDS));
2043 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2044 leaderActorContext.setCommitIndex(-1);
2045 leaderActorContext.setLastApplied(-1);
2047 String nonVotingFollowerId = "nonvoting-follower";
2048 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2049 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2051 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2052 VotingState.NON_VOTING);
2054 leader = new Leader(leaderActorContext);
2055 leaderActorContext.setCurrentBehavior(leader);
2057 // Ignore initial heartbeats
2058 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2061 MessageCollectorActor.clearMessages(followerActor);
2062 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2063 MessageCollectorActor.clearMessages(leaderActor);
2065 // Send a Replicate message and wait for AppendEntries.
2066 sendReplicate(leaderActorContext, 0);
2068 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2069 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2071 // Send reply only from the voting follower and verify consensus via ApplyState.
2072 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2074 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2076 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2078 MessageCollectorActor.clearMessages(followerActor);
2079 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2080 MessageCollectorActor.clearMessages(leaderActor);
2082 // Send another Replicate message
2083 sendReplicate(leaderActorContext, 1);
2085 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2087 AppendEntries.class);
2088 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2089 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2091 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2092 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2094 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2096 // Send reply from the voting follower and verify consensus.
2097 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2099 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2103 public void testTransferLeadershipWithFollowerInSync() {
2104 logStart("testTransferLeadershipWithFollowerInSync");
2106 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2107 leaderActorContext.setLastApplied(-1);
2108 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2109 new FiniteDuration(1000, TimeUnit.SECONDS));
2110 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2112 leader = new Leader(leaderActorContext);
2113 leaderActorContext.setCurrentBehavior(leader);
2115 // Initial heartbeat
2116 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2117 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2118 MessageCollectorActor.clearMessages(followerActor);
2120 sendReplicate(leaderActorContext, 0);
2121 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2123 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2124 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2125 MessageCollectorActor.clearMessages(followerActor);
2127 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2128 leader.transferLeadership(mockTransferCohort);
2130 verify(mockTransferCohort, never()).transferComplete();
2131 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2132 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2135 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2136 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2138 // Leader should force an election timeout
2139 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2141 verify(mockTransferCohort).transferComplete();
2145 public void testTransferLeadershipWithEmptyLog() {
2146 logStart("testTransferLeadershipWithEmptyLog");
2148 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2149 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2150 new FiniteDuration(1000, TimeUnit.SECONDS));
2151 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2153 leader = new Leader(leaderActorContext);
2154 leaderActorContext.setCurrentBehavior(leader);
2156 // Initial heartbeat
2157 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2158 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2159 MessageCollectorActor.clearMessages(followerActor);
2161 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2162 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2163 leader.transferLeadership(mockTransferCohort);
2165 verify(mockTransferCohort, never()).transferComplete();
2166 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2167 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2169 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2170 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2172 // Leader should force an election timeout
2173 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2175 verify(mockTransferCohort).transferComplete();
2179 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2180 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2182 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2183 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2184 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2186 leader = new Leader(leaderActorContext);
2187 leaderActorContext.setCurrentBehavior(leader);
2189 // Initial heartbeat
2190 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2191 MessageCollectorActor.clearMessages(followerActor);
2193 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2194 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2195 leader.transferLeadership(mockTransferCohort);
2197 verify(mockTransferCohort, never()).transferComplete();
2199 // Sync up the follower.
2200 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2201 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2202 MessageCollectorActor.clearMessages(followerActor);
2204 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2205 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2206 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2207 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2208 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2210 // Leader should force an election timeout
2211 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2213 verify(mockTransferCohort).transferComplete();
2217 public void testTransferLeadershipWithFollowerSyncTimeout() {
2218 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2220 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2221 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2222 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2223 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2224 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2226 leader = new Leader(leaderActorContext);
2227 leaderActorContext.setCurrentBehavior(leader);
2229 // Initial heartbeat
2230 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2231 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2232 MessageCollectorActor.clearMessages(followerActor);
2234 sendReplicate(leaderActorContext, 0);
2235 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2237 MessageCollectorActor.clearMessages(followerActor);
2239 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2240 leader.transferLeadership(mockTransferCohort);
2242 verify(mockTransferCohort, never()).transferComplete();
2244 // Send heartbeats to time out the transfer.
2245 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2246 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2247 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2248 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2251 verify(mockTransferCohort).abortTransfer();
2252 verify(mockTransferCohort, never()).transferComplete();
2253 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2257 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2258 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2260 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2261 List.of(new SimpleReplicatedLogEntry(0, 1,
2262 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2263 final MockRaftActorContext.MockPayload largePayload =
2264 new MockRaftActorContext.MockPayload("large", serializedSize);
2266 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2267 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2268 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2269 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2270 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2271 leaderActorContext.setCommitIndex(-1);
2272 leaderActorContext.setLastApplied(-1);
2274 leader = new Leader(leaderActorContext);
2275 leaderActorContext.setCurrentBehavior(leader);
2277 // Send initial heartbeat reply so follower is marked active
2278 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2279 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2280 MessageCollectorActor.clearMessages(followerActor);
2282 // Send normal payload first to prime commit index.
2283 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2284 sendReplicate(leaderActorContext, term, 0);
2286 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2287 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2288 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2290 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2291 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2292 MessageCollectorActor.clearMessages(followerActor);
2294 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2295 sendReplicate(leaderActorContext, term, 1, largePayload);
2297 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2298 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2299 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2301 final Identifier slicingId = messageSlice.getIdentifier();
2303 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2304 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2305 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2306 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2307 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2308 MessageCollectorActor.clearMessages(followerActor);
2310 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2312 // Sleep for the heartbeat interval so AppendEntries is sent.
2313 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2314 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2316 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2318 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2319 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2320 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2321 MessageCollectorActor.clearMessages(followerActor);
2323 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2325 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2326 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2327 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2329 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2331 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2333 MessageCollectorActor.clearMessages(followerActor);
2335 // Send another normal payload.
2337 sendReplicate(leaderActorContext, term, 2);
2339 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2340 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2341 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2342 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2346 public void testLargePayloadSlicingExpiration() {
2347 logStart("testLargePayloadSlicingExpiration");
2349 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2350 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2351 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2352 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2353 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2354 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2355 leaderActorContext.setCommitIndex(-1);
2356 leaderActorContext.setLastApplied(-1);
2358 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2359 leader = new Leader(leaderActorContext);
2360 leaderActorContext.setCurrentBehavior(leader);
2362 // Send initial heartbeat reply so follower is marked active
2363 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2364 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2365 MessageCollectorActor.clearMessages(followerActor);
2367 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2368 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2369 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2371 // Sleep for at least 3 * election timeout so the slicing state expires.
2372 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2373 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2374 MessageCollectorActor.clearMessages(followerActor);
2376 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2378 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2379 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2380 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2382 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2383 MessageCollectorActor.clearMessages(followerActor);
2385 // Send an AppendEntriesReply - this should restart the slicing.
2387 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2388 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2390 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2392 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2396 public void testLeaderAddressInAppendEntries() {
2397 logStart("testLeaderAddressInAppendEntries");
2399 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2400 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2401 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2402 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2403 leaderActorContext.setCommitIndex(-1);
2404 leaderActorContext.setLastApplied(-1);
2406 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2407 peerId -> leaderActor.path().toString());
2409 leader = new Leader(leaderActorContext);
2410 leaderActorContext.setCurrentBehavior(leader);
2412 // Initial heartbeat shouldn't have the leader address
2414 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2415 assertFalse(appendEntries.getLeaderAddress().isPresent());
2416 MessageCollectorActor.clearMessages(followerActor);
2418 // Send AppendEntriesReply indicating the follower needs the leader address
2420 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2421 RaftVersions.CURRENT_VERSION));
2423 // Sleep for the heartbeat interval so AppendEntries is sent.
2424 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2425 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2427 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2429 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2430 assertTrue(appendEntries.getLeaderAddress().isPresent());
2431 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
2432 MessageCollectorActor.clearMessages(followerActor);
2434 // Send AppendEntriesReply indicating the follower does not need the leader address
2436 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2437 RaftVersions.CURRENT_VERSION));
2439 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2440 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2442 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2444 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2445 assertFalse(appendEntries.getLeaderAddress().isPresent());
2449 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2450 final ActorRef actorRef, final RaftRPC rpc) {
2451 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2452 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2455 private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2457 private final long electionTimeOutIntervalMillis;
2458 private final int snapshotChunkSize;
2460 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2461 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2462 this.snapshotChunkSize = snapshotChunkSize;
2466 public FiniteDuration getElectionTimeOutInterval() {
2467 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2471 public int getSnapshotChunkSize() {
2472 return snapshotChunkSize;