2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.Optional;
37 import java.util.OptionalInt;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.messaging.MessageSlice;
44 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
47 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.cluster.raft.RaftVersions;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.VotingState;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
59 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.Payload;
67 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
72 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
73 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
75 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.opendaylight.yangtools.concepts.Identifier;
78 import scala.concurrent.duration.FiniteDuration;
80 public class LeaderTest extends AbstractLeaderTest<Leader> {
82 static final String FOLLOWER_ID = "follower";
83 public static final String LEADER_ID = "leader";
85 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
86 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
88 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
91 private Leader leader;
92 private final short payloadVersion = 5;
96 public void tearDown() {
105 public void testHandleMessageForUnknownMessage() {
106 logStart("testHandleMessageForUnknownMessage");
108 leader = new Leader(createActorContext());
110 // handle message should null when it receives an unknown message
111 assertNull(leader.handleMessage(followerActor, "foo"));
115 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
116 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
118 MockRaftActorContext actorContext = createActorContextWithFollower();
119 actorContext.setCommitIndex(-1);
120 actorContext.setPayloadVersion(payloadVersion);
123 actorContext.getTermInformation().update(term, "");
125 leader = new Leader(actorContext);
126 actorContext.setCurrentBehavior(leader);
128 // Leader should send an immediate heartbeat with no entries as follower is inactive.
129 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
130 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
131 assertEquals("getTerm", term, appendEntries.getTerm());
132 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
133 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
134 assertEquals("Entries size", 0, appendEntries.getEntries().size());
135 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
137 // The follower would normally reply - simulate that explicitly here.
138 leader.handleMessage(followerActor, new AppendEntriesReply(
139 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
140 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142 followerActor.underlyingActor().clear();
144 // Sleep for the heartbeat interval so AppendEntries is sent.
145 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
146 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
148 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
150 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
152 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
153 assertEquals("Entries size", 1, appendEntries.getEntries().size());
154 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
155 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
156 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
160 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
161 return sendReplicate(actorContext, 1, index);
164 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
166 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
169 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
170 final Payload payload) {
171 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload));
172 return leader.handleMessage(leaderActor, new Replicate(index, true, null, null));
176 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
177 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179 MockRaftActorContext actorContext = createActorContextWithFollower();
182 actorContext.getTermInformation().update(term, "");
184 leader = new Leader(actorContext);
186 // Leader will send an immediate heartbeat - ignore it.
187 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189 // The follower would normally reply - simulate that explicitly here.
190 long lastIndex = actorContext.getReplicatedLog().lastIndex();
191 leader.handleMessage(followerActor, new AppendEntriesReply(
192 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
193 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
195 followerActor.underlyingActor().clear();
197 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
199 // State should not change
200 assertTrue(raftBehavior instanceof Leader);
202 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
203 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
204 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
205 assertEquals("Entries size", 1, appendEntries.getEntries().size());
206 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
207 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
208 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
209 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
213 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
214 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
216 MockRaftActorContext actorContext = createActorContextWithFollower();
217 actorContext.setCommitIndex(-1);
218 actorContext.setLastApplied(-1);
220 // The raft context is initialized with a couple log entries. However the commitIndex
221 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
222 // committed and applied. Now it regains leadership with a higher term (2).
223 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
224 long newTerm = prevTerm + 1;
225 actorContext.getTermInformation().update(newTerm, "");
227 leader = new Leader(actorContext);
228 actorContext.setCurrentBehavior(leader);
230 // Leader will send an immediate heartbeat - ignore it.
231 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233 // The follower replies with the leader's current last index and term, simulating that it is
234 // up to date with the leader.
235 long lastIndex = actorContext.getReplicatedLog().lastIndex();
236 leader.handleMessage(followerActor, new AppendEntriesReply(
237 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
239 // The commit index should not get updated even though consensus was reached. This is b/c the
240 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
241 // from previous terms by counting replicas".
242 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
244 followerActor.underlyingActor().clear();
246 // Now replicate a new entry with the new term 2.
247 long newIndex = lastIndex + 1;
248 sendReplicate(actorContext, newTerm, newIndex);
250 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
251 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
252 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
253 assertEquals("Entries size", 1, appendEntries.getEntries().size());
254 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
255 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
256 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
258 // The follower replies with success. The leader should now update the commit index to the new index
259 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
260 // prior entries are committed indirectly".
261 leader.handleMessage(followerActor, new AppendEntriesReply(
262 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
264 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
268 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
269 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
271 MockRaftActorContext actorContext = createActorContextWithFollower();
272 actorContext.setRaftPolicy(createRaftPolicy(true, true));
275 actorContext.getTermInformation().update(term, "");
277 leader = new Leader(actorContext);
279 // Leader will send an immediate heartbeat - ignore it.
280 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 // The follower would normally reply - simulate that explicitly here.
283 long lastIndex = actorContext.getReplicatedLog().lastIndex();
284 leader.handleMessage(followerActor, new AppendEntriesReply(
285 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
286 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288 followerActor.underlyingActor().clear();
290 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
292 // State should not change
293 assertTrue(raftBehavior instanceof Leader);
295 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
296 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
297 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
298 assertEquals("Entries size", 1, appendEntries.getEntries().size());
299 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
300 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
301 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
302 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
306 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
307 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
309 MockRaftActorContext actorContext = createActorContextWithFollower();
310 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312 public FiniteDuration getHeartBeatInterval() {
313 return FiniteDuration.apply(5, TimeUnit.SECONDS);
318 actorContext.getTermInformation().update(term, "");
320 leader = new Leader(actorContext);
322 // Leader will send an immediate heartbeat - ignore it.
323 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325 // The follower would normally reply - simulate that explicitly here.
326 long lastIndex = actorContext.getReplicatedLog().lastIndex();
327 leader.handleMessage(followerActor, new AppendEntriesReply(
328 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
329 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331 followerActor.underlyingActor().clear();
333 for (int i = 0; i < 5; i++) {
334 sendReplicate(actorContext, lastIndex + i + 1);
337 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
338 // We expect only 1 message to be sent because of two reasons,
339 // - an append entries reply was not received
340 // - the heartbeat interval has not expired
341 // In this scenario if multiple messages are sent they would likely be duplicates
342 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
346 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
347 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
349 MockRaftActorContext actorContext = createActorContextWithFollower();
350 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352 public FiniteDuration getHeartBeatInterval() {
353 return FiniteDuration.apply(5, TimeUnit.SECONDS);
358 actorContext.getTermInformation().update(term, "");
360 leader = new Leader(actorContext);
362 // Leader will send an immediate heartbeat - ignore it.
363 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365 // The follower would normally reply - simulate that explicitly here.
366 long lastIndex = actorContext.getReplicatedLog().lastIndex();
367 leader.handleMessage(followerActor, new AppendEntriesReply(
368 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371 followerActor.underlyingActor().clear();
373 for (int i = 0; i < 3; i++) {
374 sendReplicate(actorContext, lastIndex + i + 1);
375 leader.handleMessage(followerActor, new AppendEntriesReply(
376 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
379 // We are expecting six messages here -- a request to replicate and a consensus-reached message
380 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
382 for (int i = 0; i < 3; i++) {
383 assertRequestEntry(lastIndex, allMessages, i);
384 assertCommitEntry(lastIndex, allMessages, i);
387 // Now perform another commit, eliciting a request to persist
388 sendReplicate(actorContext, lastIndex + 3 + 1);
389 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
390 // This elicits another message for request to replicate
391 assertEquals("The number of request entries collected", 7, allMessages.size());
392 assertRequestEntry(lastIndex, allMessages, 3);
394 sendReplicate(actorContext, lastIndex + 4 + 1);
395 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
396 assertEquals("The number of request entries collected", 7, allMessages.size());
399 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
400 final int messageNr) {
401 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
402 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
403 assertEquals(List.of(), commitReq.getEntries());
406 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
407 final int messageNr) {
408 final AppendEntries req = allMessages.get(2 * messageNr);
409 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
411 final List<ReplicatedLogEntry> entries = req.getEntries();
412 assertEquals(1, entries.size());
413 assertEquals(messageNr + 2, entries.get(0).getIndex());
417 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
418 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
420 MockRaftActorContext actorContext = createActorContextWithFollower();
421 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
423 public FiniteDuration getHeartBeatInterval() {
424 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
429 actorContext.getTermInformation().update(term, "");
431 leader = new Leader(actorContext);
433 // Leader will send an immediate heartbeat - ignore it.
434 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
436 // The follower would normally reply - simulate that explicitly here.
437 long lastIndex = actorContext.getReplicatedLog().lastIndex();
438 leader.handleMessage(followerActor, new AppendEntriesReply(
439 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
440 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
442 followerActor.underlyingActor().clear();
444 sendReplicate(actorContext, lastIndex + 1);
446 // Wait slightly longer than heartbeat duration
447 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
449 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
451 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
452 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
454 assertEquals(1, allMessages.get(0).getEntries().size());
455 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
456 assertEquals(1, allMessages.get(1).getEntries().size());
457 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
462 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
463 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
465 MockRaftActorContext actorContext = createActorContextWithFollower();
466 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
468 public FiniteDuration getHeartBeatInterval() {
469 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
474 actorContext.getTermInformation().update(term, "");
476 leader = new Leader(actorContext);
478 // Leader will send an immediate heartbeat - ignore it.
479 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481 // The follower would normally reply - simulate that explicitly here.
482 long lastIndex = actorContext.getReplicatedLog().lastIndex();
483 leader.handleMessage(followerActor, new AppendEntriesReply(
484 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
485 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
487 followerActor.underlyingActor().clear();
489 for (int i = 0; i < 3; i++) {
490 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
499 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
500 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
502 MockRaftActorContext actorContext = createActorContextWithFollower();
503 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
505 public FiniteDuration getHeartBeatInterval() {
506 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
511 actorContext.getTermInformation().update(term, "");
513 leader = new Leader(actorContext);
515 // Leader will send an immediate heartbeat - ignore it.
516 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
518 // The follower would normally reply - simulate that explicitly here.
519 long lastIndex = actorContext.getReplicatedLog().lastIndex();
520 leader.handleMessage(followerActor, new AppendEntriesReply(
521 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
522 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
524 followerActor.underlyingActor().clear();
526 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
527 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
528 sendReplicate(actorContext, lastIndex + 1);
530 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
531 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
533 assertEquals(0, allMessages.get(0).getEntries().size());
534 assertEquals(1, allMessages.get(1).getEntries().size());
539 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
540 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
542 MockRaftActorContext actorContext = createActorContext();
544 leader = new Leader(actorContext);
546 actorContext.setLastApplied(0);
548 final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
549 final long term = actorContext.getTermInformation().getCurrentTerm();
550 final var data = new MockRaftActorContext.MockPayload("foo");
552 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data));
554 final Identifier id = new MockIdentifier("state-id");
555 final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id));
557 // State should not change
558 assertTrue(raftBehavior instanceof Leader);
560 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
562 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
563 // one since lastApplied state is 0.
564 final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
565 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
567 for (int i = 0; i <= newLogIndex - 1; i++) {
568 ApplyState applyState = applyStateList.get(i);
569 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
570 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
573 ApplyState last = applyStateList.get((int) newLogIndex - 1);
574 assertEquals("getData", data, last.getReplicatedLogEntry().getData());
575 assertEquals("getIdentifier", id, last.getIdentifier());
579 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
580 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
582 final MockRaftActorContext actorContext = createActorContextWithFollower();
585 actorContext.getReplicatedLog().removeFrom(0);
587 final int commitIndex = 3;
588 final int snapshotIndex = 2;
589 final int snapshotTerm = 1;
591 // set the snapshot variables in replicatedlog
592 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
593 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
594 actorContext.setCommitIndex(commitIndex);
595 //set follower timeout to 2 mins, helps during debugging
596 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
598 leader = new Leader(actorContext);
600 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
601 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
603 //update follower timestamp
604 leader.markFollowerActive(FOLLOWER_ID);
606 ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
607 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
608 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
609 -1, null, null), ByteSource.wrap(bs.toByteArray())));
610 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
611 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
612 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
613 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
615 //send first chunk and no InstallSnapshotReply received yet
617 fts.incrementChunkIndex();
619 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
620 TimeUnit.MILLISECONDS);
622 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
624 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
626 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
628 //InstallSnapshotReply received
629 fts.markSendStatus(true);
631 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
633 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
635 assertEquals(commitIndex, is.getLastIncludedIndex());
639 public void testSendAppendEntriesSnapshotScenario() {
640 logStart("testSendAppendEntriesSnapshotScenario");
642 final MockRaftActorContext actorContext = createActorContextWithFollower();
644 Map<String, String> leadersSnapshot = new HashMap<>();
645 leadersSnapshot.put("1", "A");
646 leadersSnapshot.put("2", "B");
647 leadersSnapshot.put("3", "C");
650 actorContext.getReplicatedLog().removeFrom(0);
652 final int followersLastIndex = 2;
653 final int snapshotIndex = 3;
654 final int newEntryIndex = 4;
655 final int snapshotTerm = 1;
656 final int currentTerm = 2;
658 // set the snapshot variables in replicatedlog
659 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
660 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
661 actorContext.setCommitIndex(followersLastIndex);
663 leader = new Leader(actorContext);
665 // Leader will send an immediate heartbeat - ignore it.
666 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
669 actorContext.getReplicatedLog().append(
670 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
672 //update follower timestamp
673 leader.markFollowerActive(FOLLOWER_ID);
675 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
676 RaftActorBehavior raftBehavior = leader.handleMessage(
677 leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
679 assertTrue(raftBehavior instanceof Leader);
681 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
685 public void testInitiateInstallSnapshot() {
686 logStart("testInitiateInstallSnapshot");
688 MockRaftActorContext actorContext = createActorContextWithFollower();
691 actorContext.getReplicatedLog().removeFrom(0);
693 final int followersLastIndex = 2;
694 final int snapshotIndex = 3;
695 final int newEntryIndex = 4;
696 final int snapshotTerm = 1;
697 final int currentTerm = 2;
699 // set the snapshot variables in replicatedlog
700 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
701 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
702 actorContext.setLastApplied(3);
703 actorContext.setCommitIndex(followersLastIndex);
705 leader = new Leader(actorContext);
707 // Leader will send an immediate heartbeat - ignore it.
708 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
710 // set the snapshot as absent and check if capture-snapshot is invoked.
711 leader.setSnapshotHolder(null);
714 actorContext.getReplicatedLog().append(
715 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
717 //update follower timestamp
718 leader.markFollowerActive(FOLLOWER_ID);
720 leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
722 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
724 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
726 assertEquals(3, cs.getLastAppliedIndex());
727 assertEquals(1, cs.getLastAppliedTerm());
728 assertEquals(4, cs.getLastIndex());
729 assertEquals(2, cs.getLastTerm());
731 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
732 leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
734 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
738 public void testInitiateForceInstallSnapshot() throws Exception {
739 logStart("testInitiateForceInstallSnapshot");
741 MockRaftActorContext actorContext = createActorContextWithFollower();
743 final int followersLastIndex = 2;
744 final int snapshotIndex = -1;
745 final int newEntryIndex = 4;
746 final int snapshotTerm = -1;
747 final int currentTerm = 2;
749 // set the snapshot variables in replicatedlog
750 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
751 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
752 actorContext.setLastApplied(3);
753 actorContext.setCommitIndex(followersLastIndex);
755 actorContext.getReplicatedLog().removeFrom(0);
757 AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
758 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
760 leader = new Leader(actorContext);
761 actorContext.setCurrentBehavior(leader);
763 // Leader will send an immediate heartbeat - ignore it.
764 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
766 // set the snapshot as absent and check if capture-snapshot is invoked.
767 leader.setSnapshotHolder(null);
769 for (int i = 0; i < 4; i++) {
770 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
771 new MockRaftActorContext.MockPayload("X" + i)));
775 actorContext.getReplicatedLog().append(
776 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
778 //update follower timestamp
779 leader.markFollowerActive(FOLLOWER_ID);
781 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
782 // installed with a SendInstallSnapshot
783 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
784 RaftVersions.CURRENT_VERSION));
786 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
788 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
789 assertEquals(3, cs.getLastAppliedIndex());
790 assertEquals(1, cs.getLastAppliedTerm());
791 assertEquals(4, cs.getLastIndex());
792 assertEquals(2, cs.getLastTerm());
794 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
795 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
797 MessageCollectorActor.clearMessages(followerActor);
799 // Sending Replicate message should not initiate another capture since the first is in progress.
800 leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
801 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
803 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
804 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
805 RaftVersions.CURRENT_VERSION));
806 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
808 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
809 final byte[] bytes = new byte[]{1, 2, 3};
810 installSnapshotStream.get().orElseThrow().write(bytes);
811 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
812 Runtime.getRuntime().totalMemory());
813 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
815 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
816 MessageCollectorActor.clearMessages(followerActor);
817 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
818 RaftVersions.CURRENT_VERSION));
819 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
824 public void testInstallSnapshot() {
825 logStart("testInstallSnapshot");
827 final MockRaftActorContext actorContext = createActorContextWithFollower();
829 Map<String, String> leadersSnapshot = new HashMap<>();
830 leadersSnapshot.put("1", "A");
831 leadersSnapshot.put("2", "B");
832 leadersSnapshot.put("3", "C");
835 actorContext.getReplicatedLog().removeFrom(0);
837 final int lastAppliedIndex = 3;
838 final int snapshotIndex = 2;
839 final int snapshotTerm = 1;
840 final int currentTerm = 2;
842 // set the snapshot variables in replicatedlog
843 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
844 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
845 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
846 actorContext.setCommitIndex(lastAppliedIndex);
847 actorContext.setLastApplied(lastAppliedIndex);
849 leader = new Leader(actorContext);
851 // Initial heartbeat.
852 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
854 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
855 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
857 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
858 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
859 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
861 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
862 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
864 assertTrue(raftBehavior instanceof Leader);
866 // check if installsnapshot gets called with the correct values.
868 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
869 InstallSnapshot.class);
871 assertNotNull(installSnapshot.getData());
872 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
873 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
875 assertEquals(currentTerm, installSnapshot.getTerm());
879 public void testForceInstallSnapshot() {
880 logStart("testForceInstallSnapshot");
882 final MockRaftActorContext actorContext = createActorContextWithFollower();
884 Map<String, String> leadersSnapshot = new HashMap<>();
885 leadersSnapshot.put("1", "A");
886 leadersSnapshot.put("2", "B");
887 leadersSnapshot.put("3", "C");
889 final int lastAppliedIndex = 3;
890 final int snapshotIndex = -1;
891 final int snapshotTerm = -1;
892 final int currentTerm = 2;
894 // set the snapshot variables in replicatedlog
895 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
896 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
897 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
898 actorContext.setCommitIndex(lastAppliedIndex);
899 actorContext.setLastApplied(lastAppliedIndex);
901 leader = new Leader(actorContext);
903 // Initial heartbeat.
904 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
906 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
907 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
909 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
910 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
911 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
913 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
914 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
916 assertTrue(raftBehavior instanceof Leader);
918 // check if installsnapshot gets called with the correct values.
920 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
921 InstallSnapshot.class);
923 assertNotNull(installSnapshot.getData());
924 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
925 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
927 assertEquals(currentTerm, installSnapshot.getTerm());
931 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
932 logStart("testHandleInstallSnapshotReplyLastChunk");
934 MockRaftActorContext actorContext = createActorContextWithFollower();
936 final int commitIndex = 3;
937 final int snapshotIndex = 2;
938 final int snapshotTerm = 1;
939 final int currentTerm = 2;
941 actorContext.setCommitIndex(commitIndex);
943 leader = new Leader(actorContext);
944 actorContext.setCurrentBehavior(leader);
946 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
947 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
949 // Ignore initial heartbeat.
950 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
952 Map<String, String> leadersSnapshot = new HashMap<>();
953 leadersSnapshot.put("1", "A");
954 leadersSnapshot.put("2", "B");
955 leadersSnapshot.put("3", "C");
957 // set the snapshot variables in replicatedlog
959 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
960 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
961 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
963 ByteString bs = toByteString(leadersSnapshot);
964 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
965 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
966 -1, null, null), ByteSource.wrap(bs.toByteArray())));
967 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
968 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
969 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
970 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
971 while (!fts.isLastChunk(fts.getChunkIndex())) {
973 fts.incrementChunkIndex();
977 actorContext.getReplicatedLog().removeFrom(0);
979 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
980 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
982 assertTrue(raftBehavior instanceof Leader);
984 assertEquals(1, leader.followerLogSize());
985 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
987 assertNull(fli.getInstallSnapshotState());
988 assertEquals(commitIndex, fli.getMatchIndex());
989 assertEquals(commitIndex + 1, fli.getNextIndex());
990 assertFalse(leader.hasSnapshot());
994 public void testSendSnapshotfromInstallSnapshotReply() {
995 logStart("testSendSnapshotfromInstallSnapshotReply");
997 MockRaftActorContext actorContext = createActorContextWithFollower();
999 final int commitIndex = 3;
1000 final int snapshotIndex = 2;
1001 final int snapshotTerm = 1;
1002 final int currentTerm = 2;
1004 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1006 public int getSnapshotChunkSize() {
1010 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1011 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1013 actorContext.setConfigParams(configParams);
1014 actorContext.setCommitIndex(commitIndex);
1016 leader = new Leader(actorContext);
1017 actorContext.setCurrentBehavior(leader);
1019 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1020 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1022 Map<String, String> leadersSnapshot = new HashMap<>();
1023 leadersSnapshot.put("1", "A");
1024 leadersSnapshot.put("2", "B");
1025 leadersSnapshot.put("3", "C");
1027 // set the snapshot variables in replicatedlog
1028 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1029 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1030 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1032 ByteString bs = toByteString(leadersSnapshot);
1033 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1034 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1036 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1038 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1039 InstallSnapshot.class);
1041 assertEquals(1, installSnapshot.getChunkIndex());
1042 assertEquals(3, installSnapshot.getTotalChunks());
1044 followerActor.underlyingActor().clear();
1045 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1046 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1048 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1050 assertEquals(2, installSnapshot.getChunkIndex());
1051 assertEquals(3, installSnapshot.getTotalChunks());
1053 followerActor.underlyingActor().clear();
1054 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1055 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1057 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1059 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1060 followerActor.underlyingActor().clear();
1061 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1062 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1064 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1066 assertNull(installSnapshot);
1071 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1072 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1074 MockRaftActorContext actorContext = createActorContextWithFollower();
1076 final int commitIndex = 3;
1077 final int snapshotIndex = 2;
1078 final int snapshotTerm = 1;
1079 final int currentTerm = 2;
1081 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1083 public int getSnapshotChunkSize() {
1088 actorContext.setCommitIndex(commitIndex);
1090 leader = new Leader(actorContext);
1092 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1093 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1095 Map<String, String> leadersSnapshot = new HashMap<>();
1096 leadersSnapshot.put("1", "A");
1097 leadersSnapshot.put("2", "B");
1098 leadersSnapshot.put("3", "C");
1100 // set the snapshot variables in replicatedlog
1101 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1102 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1103 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1105 ByteString bs = toByteString(leadersSnapshot);
1106 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1107 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1109 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1110 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1112 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1113 InstallSnapshot.class);
1115 assertEquals(1, installSnapshot.getChunkIndex());
1116 assertEquals(3, installSnapshot.getTotalChunks());
1118 followerActor.underlyingActor().clear();
1120 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1121 FOLLOWER_ID, -1, false));
1123 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1124 TimeUnit.MILLISECONDS);
1126 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1128 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1130 assertEquals(1, installSnapshot.getChunkIndex());
1131 assertEquals(3, installSnapshot.getTotalChunks());
1135 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1136 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1138 MockRaftActorContext actorContext = createActorContextWithFollower();
1140 final int commitIndex = 3;
1141 final int snapshotIndex = 2;
1142 final int snapshotTerm = 1;
1143 final int currentTerm = 2;
1145 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1147 public int getSnapshotChunkSize() {
1152 actorContext.setCommitIndex(commitIndex);
1154 leader = new Leader(actorContext);
1156 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1157 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1159 Map<String, String> leadersSnapshot = new HashMap<>();
1160 leadersSnapshot.put("1", "A");
1161 leadersSnapshot.put("2", "B");
1162 leadersSnapshot.put("3", "C");
1164 // set the snapshot variables in replicatedlog
1165 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1166 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1167 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1169 ByteString bs = toByteString(leadersSnapshot);
1170 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1171 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1173 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1175 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1176 InstallSnapshot.class);
1178 assertEquals(1, installSnapshot.getChunkIndex());
1179 assertEquals(3, installSnapshot.getTotalChunks());
1180 assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
1181 installSnapshot.getLastChunkHashCode());
1183 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1185 followerActor.underlyingActor().clear();
1187 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1188 FOLLOWER_ID, 1, true));
1190 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1192 assertEquals(2, installSnapshot.getChunkIndex());
1193 assertEquals(3, installSnapshot.getTotalChunks());
1194 assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
1198 public void testLeaderInstallSnapshotState() throws IOException {
1199 logStart("testLeaderInstallSnapshotState");
1201 Map<String, String> leadersSnapshot = new HashMap<>();
1202 leadersSnapshot.put("1", "A");
1203 leadersSnapshot.put("2", "B");
1204 leadersSnapshot.put("3", "C");
1206 ByteString bs = toByteString(leadersSnapshot);
1207 byte[] barray = bs.toByteArray();
1209 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1210 fts.setSnapshotBytes(ByteSource.wrap(barray));
1212 assertEquals(bs.size(), barray.length);
1215 for (int i = 0; i < barray.length; i = i + 50) {
1216 int length = i + 50;
1219 if (i + 50 > barray.length) {
1220 length = barray.length;
1223 byte[] chunk = fts.getNextChunk();
1224 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1225 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1227 fts.markSendStatus(true);
1228 if (!fts.isLastChunk(chunkIndex)) {
1229 fts.incrementChunkIndex();
1233 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1238 protected Leader createBehavior(final RaftActorContext actorContext) {
1239 return new Leader(actorContext);
1243 protected MockRaftActorContext createActorContext() {
1244 return createActorContext(leaderActor);
1248 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1249 return createActorContext(LEADER_ID, actorRef);
1252 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1253 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1254 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1255 configParams.setElectionTimeoutFactor(100000);
1256 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1257 context.setConfigParams(configParams);
1258 context.setPayloadVersion(payloadVersion);
1262 private MockRaftActorContext createActorContextWithFollower() {
1263 MockRaftActorContext actorContext = createActorContext();
1264 actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1265 return actorContext;
1268 private MockRaftActorContext createFollowerActorContextWithLeader() {
1269 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1270 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1271 followerConfig.setElectionTimeoutFactor(10000);
1272 followerActorContext.setConfigParams(followerConfig);
1273 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1274 return followerActorContext;
1278 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1279 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1281 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1283 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1285 Follower follower = new Follower(followerActorContext);
1286 followerActor.underlyingActor().setBehavior(follower);
1287 followerActorContext.setCurrentBehavior(follower);
1289 Map<String, String> peerAddresses = new HashMap<>();
1290 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1292 leaderActorContext.setPeerAddresses(peerAddresses);
1294 leaderActorContext.getReplicatedLog().removeFrom(0);
1297 leaderActorContext.setReplicatedLog(
1298 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1300 leaderActorContext.setCommitIndex(1);
1302 followerActorContext.getReplicatedLog().removeFrom(0);
1304 // follower too has the exact same log entries and has the same commit index
1305 followerActorContext.setReplicatedLog(
1306 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308 followerActorContext.setCommitIndex(1);
1310 leader = new Leader(leaderActorContext);
1311 leaderActorContext.setCurrentBehavior(leader);
1313 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1315 assertEquals(-1, appendEntries.getLeaderCommit());
1316 assertEquals(0, appendEntries.getEntries().size());
1317 assertEquals(0, appendEntries.getPrevLogIndex());
1319 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1320 leaderActor, AppendEntriesReply.class);
1322 assertEquals(2, appendEntriesReply.getLogLastIndex());
1323 assertEquals(1, appendEntriesReply.getLogLastTerm());
1325 // follower returns its next index
1326 assertEquals(2, appendEntriesReply.getLogLastIndex());
1327 assertEquals(1, appendEntriesReply.getLogLastTerm());
1333 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1334 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1336 final MockRaftActorContext leaderActorContext = createActorContext();
1338 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1339 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1341 Follower follower = new Follower(followerActorContext);
1342 followerActor.underlyingActor().setBehavior(follower);
1343 followerActorContext.setCurrentBehavior(follower);
1345 Map<String, String> leaderPeerAddresses = new HashMap<>();
1346 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1348 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1350 leaderActorContext.getReplicatedLog().removeFrom(0);
1352 leaderActorContext.setReplicatedLog(
1353 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1355 leaderActorContext.setCommitIndex(1);
1357 followerActorContext.getReplicatedLog().removeFrom(0);
1359 followerActorContext.setReplicatedLog(
1360 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1362 // follower has the same log entries but its commit index > leaders commit index
1363 followerActorContext.setCommitIndex(2);
1365 leader = new Leader(leaderActorContext);
1367 // Initial heartbeat
1368 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1370 assertEquals(-1, appendEntries.getLeaderCommit());
1371 assertEquals(0, appendEntries.getEntries().size());
1372 assertEquals(0, appendEntries.getPrevLogIndex());
1374 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1375 leaderActor, AppendEntriesReply.class);
1377 assertEquals(2, appendEntriesReply.getLogLastIndex());
1378 assertEquals(1, appendEntriesReply.getLogLastTerm());
1380 leaderActor.underlyingActor().setBehavior(follower);
1381 leader.handleMessage(followerActor, appendEntriesReply);
1383 leaderActor.underlyingActor().clear();
1384 followerActor.underlyingActor().clear();
1386 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1387 TimeUnit.MILLISECONDS);
1389 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1391 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1393 assertEquals(2, appendEntries.getLeaderCommit());
1394 assertEquals(0, appendEntries.getEntries().size());
1395 assertEquals(2, appendEntries.getPrevLogIndex());
1397 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1399 assertEquals(2, appendEntriesReply.getLogLastIndex());
1400 assertEquals(1, appendEntriesReply.getLogLastTerm());
1402 assertEquals(2, followerActorContext.getCommitIndex());
1408 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1409 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1411 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1412 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1413 new FiniteDuration(1000, TimeUnit.SECONDS));
1415 leaderActorContext.setReplicatedLog(
1416 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1417 long leaderCommitIndex = 2;
1418 leaderActorContext.setCommitIndex(leaderCommitIndex);
1419 leaderActorContext.setLastApplied(leaderCommitIndex);
1421 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1422 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1424 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1426 followerActorContext.setReplicatedLog(
1427 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1428 followerActorContext.setCommitIndex(0);
1429 followerActorContext.setLastApplied(0);
1431 Follower follower = new Follower(followerActorContext);
1432 followerActor.underlyingActor().setBehavior(follower);
1434 leader = new Leader(leaderActorContext);
1436 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1437 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1438 AppendEntriesReply.class);
1440 MessageCollectorActor.clearMessages(followerActor);
1441 MessageCollectorActor.clearMessages(leaderActor);
1443 // Verify initial AppendEntries sent.
1444 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1445 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1446 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1448 leaderActor.underlyingActor().setBehavior(leader);
1450 leader.handleMessage(followerActor, appendEntriesReply);
1452 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1453 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1455 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1456 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1457 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1459 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1460 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1461 appendEntries.getEntries().get(0).getData());
1462 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1463 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1464 appendEntries.getEntries().get(1).getData());
1466 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1467 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1469 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1471 ApplyState applyState = applyStateList.get(0);
1472 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1473 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1474 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1475 applyState.getReplicatedLogEntry().getData());
1477 applyState = applyStateList.get(1);
1478 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1479 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1480 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1481 applyState.getReplicatedLogEntry().getData());
1483 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1484 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1488 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1489 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1491 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1492 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1493 new FiniteDuration(1000, TimeUnit.SECONDS));
1495 leaderActorContext.setReplicatedLog(
1496 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1497 long leaderCommitIndex = 1;
1498 leaderActorContext.setCommitIndex(leaderCommitIndex);
1499 leaderActorContext.setLastApplied(leaderCommitIndex);
1501 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1502 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1504 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1506 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1507 followerActorContext.setCommitIndex(-1);
1508 followerActorContext.setLastApplied(-1);
1510 Follower follower = new Follower(followerActorContext);
1511 followerActor.underlyingActor().setBehavior(follower);
1512 followerActorContext.setCurrentBehavior(follower);
1514 leader = new Leader(leaderActorContext);
1516 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1517 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1518 AppendEntriesReply.class);
1520 MessageCollectorActor.clearMessages(followerActor);
1521 MessageCollectorActor.clearMessages(leaderActor);
1523 // Verify initial AppendEntries sent with the leader's current commit index.
1524 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1525 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1526 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1528 leaderActor.underlyingActor().setBehavior(leader);
1529 leaderActorContext.setCurrentBehavior(leader);
1531 leader.handleMessage(followerActor, appendEntriesReply);
1533 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1534 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1536 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1537 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1538 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1540 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1541 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1542 appendEntries.getEntries().get(0).getData());
1543 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1544 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1545 appendEntries.getEntries().get(1).getData());
1547 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1548 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1550 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1552 ApplyState applyState = applyStateList.get(0);
1553 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1554 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1555 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1556 applyState.getReplicatedLogEntry().getData());
1558 applyState = applyStateList.get(1);
1559 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1560 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1561 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1562 applyState.getReplicatedLogEntry().getData());
1564 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1565 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1569 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1570 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1572 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1573 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1574 new FiniteDuration(1000, TimeUnit.SECONDS));
1576 leaderActorContext.setReplicatedLog(
1577 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1578 long leaderCommitIndex = 1;
1579 leaderActorContext.setCommitIndex(leaderCommitIndex);
1580 leaderActorContext.setLastApplied(leaderCommitIndex);
1582 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1583 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1585 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1587 followerActorContext.setReplicatedLog(
1588 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1589 followerActorContext.setCommitIndex(-1);
1590 followerActorContext.setLastApplied(-1);
1592 Follower follower = new Follower(followerActorContext);
1593 followerActor.underlyingActor().setBehavior(follower);
1594 followerActorContext.setCurrentBehavior(follower);
1596 leader = new Leader(leaderActorContext);
1598 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1599 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1600 AppendEntriesReply.class);
1602 MessageCollectorActor.clearMessages(followerActor);
1603 MessageCollectorActor.clearMessages(leaderActor);
1605 // Verify initial AppendEntries sent with the leader's current commit index.
1606 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1607 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1608 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1610 leaderActor.underlyingActor().setBehavior(leader);
1611 leaderActorContext.setCurrentBehavior(leader);
1613 leader.handleMessage(followerActor, appendEntriesReply);
1615 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1616 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1618 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1619 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1620 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1622 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1623 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1624 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1625 appendEntries.getEntries().get(0).getData());
1626 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1627 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1628 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1629 appendEntries.getEntries().get(1).getData());
1631 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1632 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1634 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1636 ApplyState applyState = applyStateList.get(0);
1637 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1638 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1639 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1640 applyState.getReplicatedLogEntry().getData());
1642 applyState = applyStateList.get(1);
1643 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1644 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1645 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1646 applyState.getReplicatedLogEntry().getData());
1648 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1649 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1650 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1654 public void testHandleAppendEntriesReplyWithNewerTerm() {
1655 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1657 MockRaftActorContext leaderActorContext = createActorContext();
1658 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1659 new FiniteDuration(10000, TimeUnit.SECONDS));
1661 leaderActorContext.setReplicatedLog(
1662 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1664 leader = new Leader(leaderActorContext);
1665 leaderActor.underlyingActor().setBehavior(leader);
1666 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1668 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1669 AppendEntriesReply.class);
1671 assertEquals(false, appendEntriesReply.isSuccess());
1672 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1674 MessageCollectorActor.clearMessages(leaderActor);
1678 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1679 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1681 MockRaftActorContext leaderActorContext = createActorContext();
1682 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1683 new FiniteDuration(10000, TimeUnit.SECONDS));
1685 leaderActorContext.setReplicatedLog(
1686 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1687 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1689 leader = new Leader(leaderActorContext);
1690 leaderActor.underlyingActor().setBehavior(leader);
1691 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1693 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1694 AppendEntriesReply.class);
1696 assertEquals(false, appendEntriesReply.isSuccess());
1697 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1699 MessageCollectorActor.clearMessages(leaderActor);
1703 public void testHandleAppendEntriesReplySuccess() {
1704 logStart("testHandleAppendEntriesReplySuccess");
1706 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1708 leaderActorContext.setReplicatedLog(
1709 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1711 leaderActorContext.setCommitIndex(1);
1712 leaderActorContext.setLastApplied(1);
1713 leaderActorContext.getTermInformation().update(1, "leader");
1715 leader = new Leader(leaderActorContext);
1717 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1719 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1720 assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
1722 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1724 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1726 assertEquals(RaftState.Leader, raftActorBehavior.state());
1728 assertEquals(2, leaderActorContext.getCommitIndex());
1730 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1731 leaderActor, ApplyJournalEntries.class);
1733 assertEquals(2, leaderActorContext.getLastApplied());
1735 assertEquals(2, applyJournalEntries.getToIndex());
1737 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1740 assertEquals(1,applyStateList.size());
1742 ApplyState applyState = applyStateList.get(0);
1744 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1746 assertEquals(2, followerInfo.getMatchIndex());
1747 assertEquals(3, followerInfo.getNextIndex());
1748 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1749 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1753 public void testHandleAppendEntriesReplyUnknownFollower() {
1754 logStart("testHandleAppendEntriesReplyUnknownFollower");
1756 MockRaftActorContext leaderActorContext = createActorContext();
1758 leader = new Leader(leaderActorContext);
1760 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1762 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1764 assertEquals(RaftState.Leader, raftActorBehavior.state());
1768 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1769 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1771 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1772 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1773 new FiniteDuration(1000, TimeUnit.SECONDS));
1774 // Note: the size here depends on estimate
1775 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1777 leaderActorContext.setReplicatedLog(
1778 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1779 long leaderCommitIndex = 3;
1780 leaderActorContext.setCommitIndex(leaderCommitIndex);
1781 leaderActorContext.setLastApplied(leaderCommitIndex);
1783 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1784 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1785 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1786 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1788 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1790 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1791 followerActorContext.setCommitIndex(-1);
1792 followerActorContext.setLastApplied(-1);
1794 Follower follower = new Follower(followerActorContext);
1795 followerActor.underlyingActor().setBehavior(follower);
1796 followerActorContext.setCurrentBehavior(follower);
1798 leader = new Leader(leaderActorContext);
1800 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1801 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1802 AppendEntriesReply.class);
1804 MessageCollectorActor.clearMessages(followerActor);
1805 MessageCollectorActor.clearMessages(leaderActor);
1807 // Verify initial AppendEntries sent with the leader's current commit index.
1808 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1809 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1810 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1812 leaderActor.underlyingActor().setBehavior(leader);
1813 leaderActorContext.setCurrentBehavior(leader);
1815 leader.handleMessage(followerActor, appendEntriesReply);
1817 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1818 AppendEntries.class, 2);
1819 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1821 appendEntries = appendEntriesList.get(0);
1822 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1823 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1824 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1826 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1827 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1828 appendEntries.getEntries().get(0).getData());
1829 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1830 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1831 appendEntries.getEntries().get(1).getData());
1833 appendEntries = appendEntriesList.get(1);
1834 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1835 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1836 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1838 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1839 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1840 appendEntries.getEntries().get(0).getData());
1841 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1842 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1843 appendEntries.getEntries().get(1).getData());
1845 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1846 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1848 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1850 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1851 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1855 public void testHandleRequestVoteReply() {
1856 logStart("testHandleRequestVoteReply");
1858 MockRaftActorContext leaderActorContext = createActorContext();
1860 leader = new Leader(leaderActorContext);
1862 // Should be a no-op.
1863 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1864 new RequestVoteReply(1, true));
1866 assertEquals(RaftState.Leader, raftActorBehavior.state());
1868 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1870 assertEquals(RaftState.Leader, raftActorBehavior.state());
1874 public void testIsolatedLeaderCheckNoFollowers() {
1875 logStart("testIsolatedLeaderCheckNoFollowers");
1877 MockRaftActorContext leaderActorContext = createActorContext();
1879 leader = new Leader(leaderActorContext);
1880 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1881 assertTrue(newBehavior instanceof Leader);
1885 public void testIsolatedLeaderCheckNoVotingFollowers() {
1886 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1888 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1889 Follower follower = new Follower(followerActorContext);
1890 followerActor.underlyingActor().setBehavior(follower);
1892 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1893 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1894 new FiniteDuration(1000, TimeUnit.SECONDS));
1895 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1897 leader = new Leader(leaderActorContext);
1898 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1899 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1900 assertTrue("Expected Leader", newBehavior instanceof Leader);
1903 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1904 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1905 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1907 MockRaftActorContext leaderActorContext = createActorContext();
1909 Map<String, String> peerAddresses = new HashMap<>();
1910 peerAddresses.put("follower-1", followerActor1.path().toString());
1911 peerAddresses.put("follower-2", followerActor2.path().toString());
1913 leaderActorContext.setPeerAddresses(peerAddresses);
1914 leaderActorContext.setRaftPolicy(raftPolicy);
1916 leader = new Leader(leaderActorContext);
1918 leader.markFollowerActive("follower-1");
1919 leader.markFollowerActive("follower-2");
1920 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1923 // kill 1 follower and verify if that got killed
1924 final TestKit probe = new TestKit(getSystem());
1925 probe.watch(followerActor1);
1926 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1927 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1928 assertEquals(termMsg1.getActor(), followerActor1);
1930 leader.markFollowerInActive("follower-1");
1931 leader.markFollowerActive("follower-2");
1932 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1933 assertTrue("Behavior not instance of Leader when majority of followers are active",
1934 newBehavior instanceof Leader);
1936 // kill 2nd follower and leader should change to Isolated leader
1937 followerActor2.tell(PoisonPill.getInstance(), null);
1938 probe.watch(followerActor2);
1939 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1940 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1941 assertEquals(termMsg2.getActor(), followerActor2);
1943 leader.markFollowerInActive("follower-2");
1944 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1948 public void testIsolatedLeaderCheckTwoFollowers() {
1949 logStart("testIsolatedLeaderCheckTwoFollowers");
1951 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1953 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1954 newBehavior instanceof IsolatedLeader);
1958 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1959 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1961 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1963 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1964 newBehavior instanceof Leader);
1968 public void testLaggingFollowerStarvation() {
1969 logStart("testLaggingFollowerStarvation");
1971 String leaderActorId = actorFactory.generateActorId("leader");
1972 String follower1ActorId = actorFactory.generateActorId("follower");
1973 String follower2ActorId = actorFactory.generateActorId("follower");
1975 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1976 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1978 MockRaftActorContext leaderActorContext =
1979 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1981 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1982 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1983 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1985 leaderActorContext.setConfigParams(configParams);
1987 leaderActorContext.setReplicatedLog(
1988 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1990 Map<String, String> peerAddresses = new HashMap<>();
1991 peerAddresses.put(follower1ActorId,
1992 follower1Actor.path().toString());
1993 peerAddresses.put(follower2ActorId,
1994 follower2Actor.path().toString());
1996 leaderActorContext.setPeerAddresses(peerAddresses);
1997 leaderActorContext.getTermInformation().update(1, leaderActorId);
1999 leader = createBehavior(leaderActorContext);
2001 leaderActor.underlyingActor().setBehavior(leader);
2003 for (int i = 1; i < 6; i++) {
2004 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2005 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2006 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2007 assertTrue(newBehavior == leader);
2008 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2011 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2012 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2014 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2015 heartbeats.size() > 1);
2017 // Check if follower-2 got AppendEntries during this time and was not starved
2018 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2020 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2021 appendEntries.size() > 1);
2025 public void testReplicationConsensusWithNonVotingFollower() {
2026 logStart("testReplicationConsensusWithNonVotingFollower");
2028 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2029 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2030 new FiniteDuration(1000, TimeUnit.SECONDS));
2032 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2033 leaderActorContext.setCommitIndex(-1);
2034 leaderActorContext.setLastApplied(-1);
2036 String nonVotingFollowerId = "nonvoting-follower";
2037 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2038 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2040 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2041 VotingState.NON_VOTING);
2043 leader = new Leader(leaderActorContext);
2044 leaderActorContext.setCurrentBehavior(leader);
2046 // Ignore initial heartbeats
2047 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2048 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2050 MessageCollectorActor.clearMessages(followerActor);
2051 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2052 MessageCollectorActor.clearMessages(leaderActor);
2054 // Send a Replicate message and wait for AppendEntries.
2055 sendReplicate(leaderActorContext, 0);
2057 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2060 // Send reply only from the voting follower and verify consensus via ApplyState.
2061 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2063 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2065 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2067 MessageCollectorActor.clearMessages(followerActor);
2068 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2069 MessageCollectorActor.clearMessages(leaderActor);
2071 // Send another Replicate message
2072 sendReplicate(leaderActorContext, 1);
2074 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2075 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2076 AppendEntries.class);
2077 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2078 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2080 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2081 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2083 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2085 // Send reply from the voting follower and verify consensus.
2086 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2088 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2092 public void testTransferLeadershipWithFollowerInSync() {
2093 logStart("testTransferLeadershipWithFollowerInSync");
2095 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2096 leaderActorContext.setLastApplied(-1);
2097 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2098 new FiniteDuration(1000, TimeUnit.SECONDS));
2099 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2101 leader = new Leader(leaderActorContext);
2102 leaderActorContext.setCurrentBehavior(leader);
2104 // Initial heartbeat
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107 MessageCollectorActor.clearMessages(followerActor);
2109 sendReplicate(leaderActorContext, 0);
2110 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2113 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2114 MessageCollectorActor.clearMessages(followerActor);
2116 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2117 leader.transferLeadership(mockTransferCohort);
2119 verify(mockTransferCohort, never()).transferComplete();
2120 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2121 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2122 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2124 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2125 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2127 // Leader should force an election timeout
2128 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2130 verify(mockTransferCohort).transferComplete();
2134 public void testTransferLeadershipWithEmptyLog() {
2135 logStart("testTransferLeadershipWithEmptyLog");
2137 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2138 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2139 new FiniteDuration(1000, TimeUnit.SECONDS));
2140 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2142 leader = new Leader(leaderActorContext);
2143 leaderActorContext.setCurrentBehavior(leader);
2145 // Initial heartbeat
2146 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2148 MessageCollectorActor.clearMessages(followerActor);
2150 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2151 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2152 leader.transferLeadership(mockTransferCohort);
2154 verify(mockTransferCohort, never()).transferComplete();
2155 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2156 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2158 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2159 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2161 // Leader should force an election timeout
2162 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2164 verify(mockTransferCohort).transferComplete();
2168 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2169 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2171 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2172 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2173 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2175 leader = new Leader(leaderActorContext);
2176 leaderActorContext.setCurrentBehavior(leader);
2178 // Initial heartbeat
2179 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2180 MessageCollectorActor.clearMessages(followerActor);
2182 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2183 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2184 leader.transferLeadership(mockTransferCohort);
2186 verify(mockTransferCohort, never()).transferComplete();
2188 // Sync up the follower.
2189 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2190 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2191 MessageCollectorActor.clearMessages(followerActor);
2193 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2194 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2195 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2196 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2197 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2199 // Leader should force an election timeout
2200 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2202 verify(mockTransferCohort).transferComplete();
2206 public void testTransferLeadershipWithFollowerSyncTimeout() {
2207 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2209 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2210 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2211 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2212 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2213 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2215 leader = new Leader(leaderActorContext);
2216 leaderActorContext.setCurrentBehavior(leader);
2218 // Initial heartbeat
2219 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2220 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2221 MessageCollectorActor.clearMessages(followerActor);
2223 sendReplicate(leaderActorContext, 0);
2224 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2226 MessageCollectorActor.clearMessages(followerActor);
2228 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2229 leader.transferLeadership(mockTransferCohort);
2231 verify(mockTransferCohort, never()).transferComplete();
2233 // Send heartbeats to time out the transfer.
2234 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2235 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2236 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2237 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2240 verify(mockTransferCohort).abortTransfer();
2241 verify(mockTransferCohort, never()).transferComplete();
2242 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2246 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2247 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2249 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2250 List.of(new SimpleReplicatedLogEntry(0, 1,
2251 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2252 final MockRaftActorContext.MockPayload largePayload =
2253 new MockRaftActorContext.MockPayload("large", serializedSize);
2255 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2256 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2257 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2258 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2259 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2260 leaderActorContext.setCommitIndex(-1);
2261 leaderActorContext.setLastApplied(-1);
2263 leader = new Leader(leaderActorContext);
2264 leaderActorContext.setCurrentBehavior(leader);
2266 // Send initial heartbeat reply so follower is marked active
2267 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2268 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2269 MessageCollectorActor.clearMessages(followerActor);
2271 // Send normal payload first to prime commit index.
2272 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2273 sendReplicate(leaderActorContext, term, 0);
2275 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2276 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2277 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2279 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2280 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2281 MessageCollectorActor.clearMessages(followerActor);
2283 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2284 sendReplicate(leaderActorContext, term, 1, largePayload);
2286 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2287 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2288 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2290 final Identifier slicingId = messageSlice.getIdentifier();
2292 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2293 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2294 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2295 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2296 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2297 MessageCollectorActor.clearMessages(followerActor);
2299 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2301 // Sleep for the heartbeat interval so AppendEntries is sent.
2302 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2303 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2305 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2307 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2308 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2309 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2310 MessageCollectorActor.clearMessages(followerActor);
2312 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2314 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2315 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2316 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2318 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2320 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2322 MessageCollectorActor.clearMessages(followerActor);
2324 // Send another normal payload.
2326 sendReplicate(leaderActorContext, term, 2);
2328 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2329 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2330 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2331 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2335 public void testLargePayloadSlicingExpiration() {
2336 logStart("testLargePayloadSlicingExpiration");
2338 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2339 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2340 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2341 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2342 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2343 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2344 leaderActorContext.setCommitIndex(-1);
2345 leaderActorContext.setLastApplied(-1);
2347 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2348 leader = new Leader(leaderActorContext);
2349 leaderActorContext.setCurrentBehavior(leader);
2351 // Send initial heartbeat reply so follower is marked active
2352 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2353 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2354 MessageCollectorActor.clearMessages(followerActor);
2356 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2357 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2358 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2360 // Sleep for at least 3 * election timeout so the slicing state expires.
2361 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2362 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2363 MessageCollectorActor.clearMessages(followerActor);
2365 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2367 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2368 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2369 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2371 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2372 MessageCollectorActor.clearMessages(followerActor);
2374 // Send an AppendEntriesReply - this should restart the slicing.
2376 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2377 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2379 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2381 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2385 public void testLeaderAddressInAppendEntries() {
2386 logStart("testLeaderAddressInAppendEntries");
2388 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2389 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2390 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2391 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2392 leaderActorContext.setCommitIndex(-1);
2393 leaderActorContext.setLastApplied(-1);
2395 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2396 peerId -> leaderActor.path().toString());
2398 leader = new Leader(leaderActorContext);
2399 leaderActorContext.setCurrentBehavior(leader);
2401 // Initial heartbeat shouldn't have the leader address
2403 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2404 assertFalse(appendEntries.getLeaderAddress().isPresent());
2405 MessageCollectorActor.clearMessages(followerActor);
2407 // Send AppendEntriesReply indicating the follower needs the leader address
2409 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2410 RaftVersions.CURRENT_VERSION));
2412 // Sleep for the heartbeat interval so AppendEntries is sent.
2413 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2414 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2416 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2418 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2419 assertTrue(appendEntries.getLeaderAddress().isPresent());
2420 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
2421 MessageCollectorActor.clearMessages(followerActor);
2423 // Send AppendEntriesReply indicating the follower does not need the leader address
2425 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2426 RaftVersions.CURRENT_VERSION));
2428 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2429 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2431 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2433 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2434 assertFalse(appendEntries.getLeaderAddress().isPresent());
2438 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2439 final ActorRef actorRef, final RaftRPC rpc) {
2440 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2441 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2444 private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2446 private final long electionTimeOutIntervalMillis;
2447 private final int snapshotChunkSize;
2449 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2450 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2451 this.snapshotChunkSize = snapshotChunkSize;
2455 public FiniteDuration getElectionTimeOutInterval() {
2456 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2460 public int getSnapshotChunkSize() {
2461 return snapshotChunkSize;