2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.Optional;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.messaging.MessageSlice;
43 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
46 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
49 import org.opendaylight.controller.cluster.raft.RaftState;
50 import org.opendaylight.controller.cluster.raft.RaftVersions;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.VotingState;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
60 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.Payload;
66 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
68 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
69 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
70 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
72 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
74 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import org.opendaylight.yangtools.concepts.Identifier;
77 import scala.concurrent.duration.FiniteDuration;
79 public class LeaderTest extends AbstractLeaderTest<Leader> {
81 static final String FOLLOWER_ID = "follower";
82 public static final String LEADER_ID = "leader";
84 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
85 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
87 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
88 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
90 private Leader leader;
91 private final short payloadVersion = 5;
95 public void tearDown() {
104 public void testHandleMessageForUnknownMessage() {
105 logStart("testHandleMessageForUnknownMessage");
107 leader = new Leader(createActorContext());
109 // handle message should null when it receives an unknown message
110 assertNull(leader.handleMessage(followerActor, "foo"));
114 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
115 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
117 MockRaftActorContext actorContext = createActorContextWithFollower();
118 actorContext.setCommitIndex(-1);
119 actorContext.setPayloadVersion(payloadVersion);
122 actorContext.getTermInformation().update(term, "");
124 leader = new Leader(actorContext);
125 actorContext.setCurrentBehavior(leader);
127 // Leader should send an immediate heartbeat with no entries as follower is inactive.
128 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
129 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
130 assertEquals("getTerm", term, appendEntries.getTerm());
131 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
132 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
133 assertEquals("Entries size", 0, appendEntries.getEntries().size());
134 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
136 // The follower would normally reply - simulate that explicitly here.
137 leader.handleMessage(followerActor, new AppendEntriesReply(
138 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
139 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
141 followerActor.underlyingActor().clear();
143 // Sleep for the heartbeat interval so AppendEntries is sent.
144 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
145 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
147 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
149 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
150 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
151 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
152 assertEquals("Entries size", 1, appendEntries.getEntries().size());
153 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
154 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
155 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
159 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
160 return sendReplicate(actorContext, 1, index);
163 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
165 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
168 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
169 final Payload payload) {
170 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
171 actorContext.getReplicatedLog().append(newEntry);
172 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
176 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
177 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179 MockRaftActorContext actorContext = createActorContextWithFollower();
182 actorContext.getTermInformation().update(term, "");
184 leader = new Leader(actorContext);
186 // Leader will send an immediate heartbeat - ignore it.
187 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189 // The follower would normally reply - simulate that explicitly here.
190 long lastIndex = actorContext.getReplicatedLog().lastIndex();
191 leader.handleMessage(followerActor, new AppendEntriesReply(
192 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
193 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
195 followerActor.underlyingActor().clear();
197 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
199 // State should not change
200 assertTrue(raftBehavior instanceof Leader);
202 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
203 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
204 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
205 assertEquals("Entries size", 1, appendEntries.getEntries().size());
206 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
207 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
208 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
209 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
213 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
214 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
216 MockRaftActorContext actorContext = createActorContextWithFollower();
217 actorContext.setCommitIndex(-1);
218 actorContext.setLastApplied(-1);
220 // The raft context is initialized with a couple log entries. However the commitIndex
221 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
222 // committed and applied. Now it regains leadership with a higher term (2).
223 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
224 long newTerm = prevTerm + 1;
225 actorContext.getTermInformation().update(newTerm, "");
227 leader = new Leader(actorContext);
228 actorContext.setCurrentBehavior(leader);
230 // Leader will send an immediate heartbeat - ignore it.
231 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233 // The follower replies with the leader's current last index and term, simulating that it is
234 // up to date with the leader.
235 long lastIndex = actorContext.getReplicatedLog().lastIndex();
236 leader.handleMessage(followerActor, new AppendEntriesReply(
237 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
239 // The commit index should not get updated even though consensus was reached. This is b/c the
240 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
241 // from previous terms by counting replicas".
242 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
244 followerActor.underlyingActor().clear();
246 // Now replicate a new entry with the new term 2.
247 long newIndex = lastIndex + 1;
248 sendReplicate(actorContext, newTerm, newIndex);
250 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
251 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
252 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
253 assertEquals("Entries size", 1, appendEntries.getEntries().size());
254 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
255 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
256 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
258 // The follower replies with success. The leader should now update the commit index to the new index
259 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
260 // prior entries are committed indirectly".
261 leader.handleMessage(followerActor, new AppendEntriesReply(
262 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
264 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
268 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
269 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
271 MockRaftActorContext actorContext = createActorContextWithFollower();
272 actorContext.setRaftPolicy(createRaftPolicy(true, true));
275 actorContext.getTermInformation().update(term, "");
277 leader = new Leader(actorContext);
279 // Leader will send an immediate heartbeat - ignore it.
280 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 // The follower would normally reply - simulate that explicitly here.
283 long lastIndex = actorContext.getReplicatedLog().lastIndex();
284 leader.handleMessage(followerActor, new AppendEntriesReply(
285 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
286 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288 followerActor.underlyingActor().clear();
290 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
292 // State should not change
293 assertTrue(raftBehavior instanceof Leader);
295 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
296 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
297 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
298 assertEquals("Entries size", 1, appendEntries.getEntries().size());
299 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
300 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
301 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
302 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
306 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
307 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
309 MockRaftActorContext actorContext = createActorContextWithFollower();
310 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312 public FiniteDuration getHeartBeatInterval() {
313 return FiniteDuration.apply(5, TimeUnit.SECONDS);
318 actorContext.getTermInformation().update(term, "");
320 leader = new Leader(actorContext);
322 // Leader will send an immediate heartbeat - ignore it.
323 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325 // The follower would normally reply - simulate that explicitly here.
326 long lastIndex = actorContext.getReplicatedLog().lastIndex();
327 leader.handleMessage(followerActor, new AppendEntriesReply(
328 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
329 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331 followerActor.underlyingActor().clear();
333 for (int i = 0; i < 5; i++) {
334 sendReplicate(actorContext, lastIndex + i + 1);
337 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
338 // We expect only 1 message to be sent because of two reasons,
339 // - an append entries reply was not received
340 // - the heartbeat interval has not expired
341 // In this scenario if multiple messages are sent they would likely be duplicates
342 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
346 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
347 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
349 MockRaftActorContext actorContext = createActorContextWithFollower();
350 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352 public FiniteDuration getHeartBeatInterval() {
353 return FiniteDuration.apply(5, TimeUnit.SECONDS);
358 actorContext.getTermInformation().update(term, "");
360 leader = new Leader(actorContext);
362 // Leader will send an immediate heartbeat - ignore it.
363 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365 // The follower would normally reply - simulate that explicitly here.
366 long lastIndex = actorContext.getReplicatedLog().lastIndex();
367 leader.handleMessage(followerActor, new AppendEntriesReply(
368 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371 followerActor.underlyingActor().clear();
373 for (int i = 0; i < 3; i++) {
374 sendReplicate(actorContext, lastIndex + i + 1);
375 leader.handleMessage(followerActor, new AppendEntriesReply(
376 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
379 // We are expecting six messages here -- a request to replicate and a consensus-reached message
380 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
382 for (int i = 0; i < 3; i++) {
383 assertRequestEntry(lastIndex, allMessages, i);
384 assertCommitEntry(lastIndex, allMessages, i);
387 // Now perform another commit, eliciting a request to persist
388 sendReplicate(actorContext, lastIndex + 3 + 1);
389 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
390 // This elicits another message for request to replicate
391 assertEquals("The number of request entries collected", 7, allMessages.size());
392 assertRequestEntry(lastIndex, allMessages, 3);
394 sendReplicate(actorContext, lastIndex + 4 + 1);
395 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
396 assertEquals("The number of request entries collected", 7, allMessages.size());
399 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
400 final int messageNr) {
401 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
402 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
403 assertEquals(List.of(), commitReq.getEntries());
406 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
407 final int messageNr) {
408 final AppendEntries req = allMessages.get(2 * messageNr);
409 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
411 final List<ReplicatedLogEntry> entries = req.getEntries();
412 assertEquals(1, entries.size());
413 assertEquals(messageNr + 2, entries.get(0).getIndex());
417 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
418 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
420 MockRaftActorContext actorContext = createActorContextWithFollower();
421 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
423 public FiniteDuration getHeartBeatInterval() {
424 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
429 actorContext.getTermInformation().update(term, "");
431 leader = new Leader(actorContext);
433 // Leader will send an immediate heartbeat - ignore it.
434 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
436 // The follower would normally reply - simulate that explicitly here.
437 long lastIndex = actorContext.getReplicatedLog().lastIndex();
438 leader.handleMessage(followerActor, new AppendEntriesReply(
439 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
440 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
442 followerActor.underlyingActor().clear();
444 sendReplicate(actorContext, lastIndex + 1);
446 // Wait slightly longer than heartbeat duration
447 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
449 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
451 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
452 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
454 assertEquals(1, allMessages.get(0).getEntries().size());
455 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
456 assertEquals(1, allMessages.get(1).getEntries().size());
457 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
462 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
463 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
465 MockRaftActorContext actorContext = createActorContextWithFollower();
466 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
468 public FiniteDuration getHeartBeatInterval() {
469 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
474 actorContext.getTermInformation().update(term, "");
476 leader = new Leader(actorContext);
478 // Leader will send an immediate heartbeat - ignore it.
479 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481 // The follower would normally reply - simulate that explicitly here.
482 long lastIndex = actorContext.getReplicatedLog().lastIndex();
483 leader.handleMessage(followerActor, new AppendEntriesReply(
484 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
485 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
487 followerActor.underlyingActor().clear();
489 for (int i = 0; i < 3; i++) {
490 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
499 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
500 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
502 MockRaftActorContext actorContext = createActorContextWithFollower();
503 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
505 public FiniteDuration getHeartBeatInterval() {
506 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
511 actorContext.getTermInformation().update(term, "");
513 leader = new Leader(actorContext);
515 // Leader will send an immediate heartbeat - ignore it.
516 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
518 // The follower would normally reply - simulate that explicitly here.
519 long lastIndex = actorContext.getReplicatedLog().lastIndex();
520 leader.handleMessage(followerActor, new AppendEntriesReply(
521 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
522 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
524 followerActor.underlyingActor().clear();
526 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
527 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
528 sendReplicate(actorContext, lastIndex + 1);
530 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
531 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
533 assertEquals(0, allMessages.get(0).getEntries().size());
534 assertEquals(1, allMessages.get(1).getEntries().size());
539 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
540 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
542 MockRaftActorContext actorContext = createActorContext();
544 leader = new Leader(actorContext);
546 actorContext.setLastApplied(0);
548 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
549 long term = actorContext.getTermInformation().getCurrentTerm();
550 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
551 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
553 actorContext.getReplicatedLog().append(newEntry);
555 final Identifier id = new MockIdentifier("state-id");
556 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
557 new Replicate(leaderActor, id, newEntry, true));
559 // State should not change
560 assertTrue(raftBehavior instanceof Leader);
562 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
564 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
565 // one since lastApplied state is 0.
566 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
567 leaderActor, ApplyState.class);
568 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
570 for (int i = 0; i <= newLogIndex - 1; i++) {
571 ApplyState applyState = applyStateList.get(i);
572 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
573 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
576 ApplyState last = applyStateList.get((int) newLogIndex - 1);
577 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
578 assertEquals("getIdentifier", id, last.getIdentifier());
582 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
583 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
585 final MockRaftActorContext actorContext = createActorContextWithFollower();
588 actorContext.getReplicatedLog().removeFrom(0);
590 final int commitIndex = 3;
591 final int snapshotIndex = 2;
592 final int snapshotTerm = 1;
594 // set the snapshot variables in replicatedlog
595 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
596 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
597 actorContext.setCommitIndex(commitIndex);
598 //set follower timeout to 2 mins, helps during debugging
599 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
601 leader = new Leader(actorContext);
603 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
604 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
606 //update follower timestamp
607 leader.markFollowerActive(FOLLOWER_ID);
609 ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
610 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
611 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
612 -1, null, null), ByteSource.wrap(bs.toByteArray())));
613 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
614 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
615 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
616 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
618 //send first chunk and no InstallSnapshotReply received yet
620 fts.incrementChunkIndex();
622 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
623 TimeUnit.MILLISECONDS);
625 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
627 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
629 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
631 //InstallSnapshotReply received
632 fts.markSendStatus(true);
634 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
636 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
638 assertEquals(commitIndex, is.getLastIncludedIndex());
642 public void testSendAppendEntriesSnapshotScenario() {
643 logStart("testSendAppendEntriesSnapshotScenario");
645 final MockRaftActorContext actorContext = createActorContextWithFollower();
647 Map<String, String> leadersSnapshot = new HashMap<>();
648 leadersSnapshot.put("1", "A");
649 leadersSnapshot.put("2", "B");
650 leadersSnapshot.put("3", "C");
653 actorContext.getReplicatedLog().removeFrom(0);
655 final int followersLastIndex = 2;
656 final int snapshotIndex = 3;
657 final int newEntryIndex = 4;
658 final int snapshotTerm = 1;
659 final int currentTerm = 2;
661 // set the snapshot variables in replicatedlog
662 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
663 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
664 actorContext.setCommitIndex(followersLastIndex);
666 leader = new Leader(actorContext);
668 // Leader will send an immediate heartbeat - ignore it.
669 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
672 SimpleReplicatedLogEntry entry =
673 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
674 new MockRaftActorContext.MockPayload("D"));
676 actorContext.getReplicatedLog().append(entry);
678 //update follower timestamp
679 leader.markFollowerActive(FOLLOWER_ID);
681 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
682 RaftActorBehavior raftBehavior = leader.handleMessage(
683 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
685 assertTrue(raftBehavior instanceof Leader);
687 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
691 public void testInitiateInstallSnapshot() {
692 logStart("testInitiateInstallSnapshot");
694 MockRaftActorContext actorContext = createActorContextWithFollower();
697 actorContext.getReplicatedLog().removeFrom(0);
699 final int followersLastIndex = 2;
700 final int snapshotIndex = 3;
701 final int newEntryIndex = 4;
702 final int snapshotTerm = 1;
703 final int currentTerm = 2;
705 // set the snapshot variables in replicatedlog
706 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
707 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
708 actorContext.setLastApplied(3);
709 actorContext.setCommitIndex(followersLastIndex);
711 leader = new Leader(actorContext);
713 // Leader will send an immediate heartbeat - ignore it.
714 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
716 // set the snapshot as absent and check if capture-snapshot is invoked.
717 leader.setSnapshotHolder(null);
720 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
721 new MockRaftActorContext.MockPayload("D"));
723 actorContext.getReplicatedLog().append(entry);
725 //update follower timestamp
726 leader.markFollowerActive(FOLLOWER_ID);
728 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
730 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
732 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
734 assertEquals(3, cs.getLastAppliedIndex());
735 assertEquals(1, cs.getLastAppliedTerm());
736 assertEquals(4, cs.getLastIndex());
737 assertEquals(2, cs.getLastTerm());
739 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
740 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
742 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
746 public void testInitiateForceInstallSnapshot() throws Exception {
747 logStart("testInitiateForceInstallSnapshot");
749 MockRaftActorContext actorContext = createActorContextWithFollower();
751 final int followersLastIndex = 2;
752 final int snapshotIndex = -1;
753 final int newEntryIndex = 4;
754 final int snapshotTerm = -1;
755 final int currentTerm = 2;
757 // set the snapshot variables in replicatedlog
758 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
759 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
760 actorContext.setLastApplied(3);
761 actorContext.setCommitIndex(followersLastIndex);
763 actorContext.getReplicatedLog().removeFrom(0);
765 AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
766 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
768 leader = new Leader(actorContext);
769 actorContext.setCurrentBehavior(leader);
771 // Leader will send an immediate heartbeat - ignore it.
772 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
774 // set the snapshot as absent and check if capture-snapshot is invoked.
775 leader.setSnapshotHolder(null);
777 for (int i = 0; i < 4; i++) {
778 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
779 new MockRaftActorContext.MockPayload("X" + i)));
783 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
784 new MockRaftActorContext.MockPayload("D"));
786 actorContext.getReplicatedLog().append(entry);
788 //update follower timestamp
789 leader.markFollowerActive(FOLLOWER_ID);
791 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
792 // installed with a SendInstallSnapshot
793 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
794 RaftVersions.CURRENT_VERSION));
796 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
798 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
799 assertEquals(3, cs.getLastAppliedIndex());
800 assertEquals(1, cs.getLastAppliedTerm());
801 assertEquals(4, cs.getLastIndex());
802 assertEquals(2, cs.getLastTerm());
804 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
805 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
807 MessageCollectorActor.clearMessages(followerActor);
809 // Sending Replicate message should not initiate another capture since the first is in progress.
810 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
811 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
813 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
814 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
815 RaftVersions.CURRENT_VERSION));
816 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
818 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
819 final byte[] bytes = new byte[]{1, 2, 3};
820 installSnapshotStream.get().get().write(bytes);
821 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
822 Runtime.getRuntime().totalMemory());
823 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
826 MessageCollectorActor.clearMessages(followerActor);
827 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
828 RaftVersions.CURRENT_VERSION));
829 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
834 public void testInstallSnapshot() {
835 logStart("testInstallSnapshot");
837 final MockRaftActorContext actorContext = createActorContextWithFollower();
839 Map<String, String> leadersSnapshot = new HashMap<>();
840 leadersSnapshot.put("1", "A");
841 leadersSnapshot.put("2", "B");
842 leadersSnapshot.put("3", "C");
845 actorContext.getReplicatedLog().removeFrom(0);
847 final int lastAppliedIndex = 3;
848 final int snapshotIndex = 2;
849 final int snapshotTerm = 1;
850 final int currentTerm = 2;
852 // set the snapshot variables in replicatedlog
853 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856 actorContext.setCommitIndex(lastAppliedIndex);
857 actorContext.setLastApplied(lastAppliedIndex);
859 leader = new Leader(actorContext);
861 // Initial heartbeat.
862 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
867 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
868 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
869 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
871 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
872 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
874 assertTrue(raftBehavior instanceof Leader);
876 // check if installsnapshot gets called with the correct values.
878 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
879 InstallSnapshot.class);
881 assertNotNull(installSnapshot.getData());
882 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
883 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885 assertEquals(currentTerm, installSnapshot.getTerm());
889 public void testForceInstallSnapshot() {
890 logStart("testForceInstallSnapshot");
892 final MockRaftActorContext actorContext = createActorContextWithFollower();
894 Map<String, String> leadersSnapshot = new HashMap<>();
895 leadersSnapshot.put("1", "A");
896 leadersSnapshot.put("2", "B");
897 leadersSnapshot.put("3", "C");
899 final int lastAppliedIndex = 3;
900 final int snapshotIndex = -1;
901 final int snapshotTerm = -1;
902 final int currentTerm = 2;
904 // set the snapshot variables in replicatedlog
905 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
906 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
907 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
908 actorContext.setCommitIndex(lastAppliedIndex);
909 actorContext.setLastApplied(lastAppliedIndex);
911 leader = new Leader(actorContext);
913 // Initial heartbeat.
914 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
916 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
917 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
919 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
920 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
921 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
923 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
924 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
926 assertTrue(raftBehavior instanceof Leader);
928 // check if installsnapshot gets called with the correct values.
930 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
931 InstallSnapshot.class);
933 assertNotNull(installSnapshot.getData());
934 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
935 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
937 assertEquals(currentTerm, installSnapshot.getTerm());
941 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
942 logStart("testHandleInstallSnapshotReplyLastChunk");
944 MockRaftActorContext actorContext = createActorContextWithFollower();
946 final int commitIndex = 3;
947 final int snapshotIndex = 2;
948 final int snapshotTerm = 1;
949 final int currentTerm = 2;
951 actorContext.setCommitIndex(commitIndex);
953 leader = new Leader(actorContext);
954 actorContext.setCurrentBehavior(leader);
956 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
957 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
959 // Ignore initial heartbeat.
960 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
962 Map<String, String> leadersSnapshot = new HashMap<>();
963 leadersSnapshot.put("1", "A");
964 leadersSnapshot.put("2", "B");
965 leadersSnapshot.put("3", "C");
967 // set the snapshot variables in replicatedlog
969 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
970 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
971 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
973 ByteString bs = toByteString(leadersSnapshot);
974 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
975 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
976 -1, null, null), ByteSource.wrap(bs.toByteArray())));
977 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
978 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
979 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
980 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
981 while (!fts.isLastChunk(fts.getChunkIndex())) {
983 fts.incrementChunkIndex();
987 actorContext.getReplicatedLog().removeFrom(0);
989 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
990 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
992 assertTrue(raftBehavior instanceof Leader);
994 assertEquals(1, leader.followerLogSize());
995 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
997 assertNull(fli.getInstallSnapshotState());
998 assertEquals(commitIndex, fli.getMatchIndex());
999 assertEquals(commitIndex + 1, fli.getNextIndex());
1000 assertFalse(leader.hasSnapshot());
1004 public void testSendSnapshotfromInstallSnapshotReply() {
1005 logStart("testSendSnapshotfromInstallSnapshotReply");
1007 MockRaftActorContext actorContext = createActorContextWithFollower();
1009 final int commitIndex = 3;
1010 final int snapshotIndex = 2;
1011 final int snapshotTerm = 1;
1012 final int currentTerm = 2;
1014 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1016 public int getSnapshotChunkSize() {
1020 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1021 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1023 actorContext.setConfigParams(configParams);
1024 actorContext.setCommitIndex(commitIndex);
1026 leader = new Leader(actorContext);
1027 actorContext.setCurrentBehavior(leader);
1029 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1030 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1032 Map<String, String> leadersSnapshot = new HashMap<>();
1033 leadersSnapshot.put("1", "A");
1034 leadersSnapshot.put("2", "B");
1035 leadersSnapshot.put("3", "C");
1037 // set the snapshot variables in replicatedlog
1038 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1039 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1040 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1042 ByteString bs = toByteString(leadersSnapshot);
1043 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1044 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1046 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1048 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1049 InstallSnapshot.class);
1051 assertEquals(1, installSnapshot.getChunkIndex());
1052 assertEquals(3, installSnapshot.getTotalChunks());
1054 followerActor.underlyingActor().clear();
1055 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1056 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1058 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1060 assertEquals(2, installSnapshot.getChunkIndex());
1061 assertEquals(3, installSnapshot.getTotalChunks());
1063 followerActor.underlyingActor().clear();
1064 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1067 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1070 followerActor.underlyingActor().clear();
1071 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1072 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1074 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1076 assertNull(installSnapshot);
1081 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1082 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1084 MockRaftActorContext actorContext = createActorContextWithFollower();
1086 final int commitIndex = 3;
1087 final int snapshotIndex = 2;
1088 final int snapshotTerm = 1;
1089 final int currentTerm = 2;
1091 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1093 public int getSnapshotChunkSize() {
1098 actorContext.setCommitIndex(commitIndex);
1100 leader = new Leader(actorContext);
1102 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1103 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1105 Map<String, String> leadersSnapshot = new HashMap<>();
1106 leadersSnapshot.put("1", "A");
1107 leadersSnapshot.put("2", "B");
1108 leadersSnapshot.put("3", "C");
1110 // set the snapshot variables in replicatedlog
1111 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1112 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1113 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1115 ByteString bs = toByteString(leadersSnapshot);
1116 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1117 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1119 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1120 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1122 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1123 InstallSnapshot.class);
1125 assertEquals(1, installSnapshot.getChunkIndex());
1126 assertEquals(3, installSnapshot.getTotalChunks());
1128 followerActor.underlyingActor().clear();
1130 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1131 FOLLOWER_ID, -1, false));
1133 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1134 TimeUnit.MILLISECONDS);
1136 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1138 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1140 assertEquals(1, installSnapshot.getChunkIndex());
1141 assertEquals(3, installSnapshot.getTotalChunks());
1145 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1146 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1148 MockRaftActorContext actorContext = createActorContextWithFollower();
1150 final int commitIndex = 3;
1151 final int snapshotIndex = 2;
1152 final int snapshotTerm = 1;
1153 final int currentTerm = 2;
1155 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1157 public int getSnapshotChunkSize() {
1162 actorContext.setCommitIndex(commitIndex);
1164 leader = new Leader(actorContext);
1166 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1167 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1169 Map<String, String> leadersSnapshot = new HashMap<>();
1170 leadersSnapshot.put("1", "A");
1171 leadersSnapshot.put("2", "B");
1172 leadersSnapshot.put("3", "C");
1174 // set the snapshot variables in replicatedlog
1175 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1176 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1177 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1179 ByteString bs = toByteString(leadersSnapshot);
1180 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1181 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1183 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1185 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1186 InstallSnapshot.class);
1188 assertEquals(1, installSnapshot.getChunkIndex());
1189 assertEquals(3, installSnapshot.getTotalChunks());
1190 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1191 installSnapshot.getLastChunkHashCode().getAsInt());
1193 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1195 followerActor.underlyingActor().clear();
1197 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1198 FOLLOWER_ID, 1, true));
1200 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1202 assertEquals(2, installSnapshot.getChunkIndex());
1203 assertEquals(3, installSnapshot.getTotalChunks());
1204 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1208 public void testLeaderInstallSnapshotState() throws IOException {
1209 logStart("testLeaderInstallSnapshotState");
1211 Map<String, String> leadersSnapshot = new HashMap<>();
1212 leadersSnapshot.put("1", "A");
1213 leadersSnapshot.put("2", "B");
1214 leadersSnapshot.put("3", "C");
1216 ByteString bs = toByteString(leadersSnapshot);
1217 byte[] barray = bs.toByteArray();
1219 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1220 fts.setSnapshotBytes(ByteSource.wrap(barray));
1222 assertEquals(bs.size(), barray.length);
1225 for (int i = 0; i < barray.length; i = i + 50) {
1226 int length = i + 50;
1229 if (i + 50 > barray.length) {
1230 length = barray.length;
1233 byte[] chunk = fts.getNextChunk();
1234 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1235 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1237 fts.markSendStatus(true);
1238 if (!fts.isLastChunk(chunkIndex)) {
1239 fts.incrementChunkIndex();
1243 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1248 protected Leader createBehavior(final RaftActorContext actorContext) {
1249 return new Leader(actorContext);
1253 protected MockRaftActorContext createActorContext() {
1254 return createActorContext(leaderActor);
1258 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1259 return createActorContext(LEADER_ID, actorRef);
1262 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1263 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1264 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1265 configParams.setElectionTimeoutFactor(100000);
1266 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1267 context.setConfigParams(configParams);
1268 context.setPayloadVersion(payloadVersion);
1272 private MockRaftActorContext createActorContextWithFollower() {
1273 MockRaftActorContext actorContext = createActorContext();
1274 actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1275 return actorContext;
1278 private MockRaftActorContext createFollowerActorContextWithLeader() {
1279 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1280 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1281 followerConfig.setElectionTimeoutFactor(10000);
1282 followerActorContext.setConfigParams(followerConfig);
1283 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1284 return followerActorContext;
1288 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1289 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1291 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1293 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295 Follower follower = new Follower(followerActorContext);
1296 followerActor.underlyingActor().setBehavior(follower);
1297 followerActorContext.setCurrentBehavior(follower);
1299 Map<String, String> peerAddresses = new HashMap<>();
1300 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1302 leaderActorContext.setPeerAddresses(peerAddresses);
1304 leaderActorContext.getReplicatedLog().removeFrom(0);
1307 leaderActorContext.setReplicatedLog(
1308 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1310 leaderActorContext.setCommitIndex(1);
1312 followerActorContext.getReplicatedLog().removeFrom(0);
1314 // follower too has the exact same log entries and has the same commit index
1315 followerActorContext.setReplicatedLog(
1316 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1318 followerActorContext.setCommitIndex(1);
1320 leader = new Leader(leaderActorContext);
1321 leaderActorContext.setCurrentBehavior(leader);
1323 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325 assertEquals(-1, appendEntries.getLeaderCommit());
1326 assertEquals(0, appendEntries.getEntries().size());
1327 assertEquals(0, appendEntries.getPrevLogIndex());
1329 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1330 leaderActor, AppendEntriesReply.class);
1332 assertEquals(2, appendEntriesReply.getLogLastIndex());
1333 assertEquals(1, appendEntriesReply.getLogLastTerm());
1335 // follower returns its next index
1336 assertEquals(2, appendEntriesReply.getLogLastIndex());
1337 assertEquals(1, appendEntriesReply.getLogLastTerm());
1343 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1344 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1346 final MockRaftActorContext leaderActorContext = createActorContext();
1348 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1349 followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1351 Follower follower = new Follower(followerActorContext);
1352 followerActor.underlyingActor().setBehavior(follower);
1353 followerActorContext.setCurrentBehavior(follower);
1355 Map<String, String> leaderPeerAddresses = new HashMap<>();
1356 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1358 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1360 leaderActorContext.getReplicatedLog().removeFrom(0);
1362 leaderActorContext.setReplicatedLog(
1363 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1365 leaderActorContext.setCommitIndex(1);
1367 followerActorContext.getReplicatedLog().removeFrom(0);
1369 followerActorContext.setReplicatedLog(
1370 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1372 // follower has the same log entries but its commit index > leaders commit index
1373 followerActorContext.setCommitIndex(2);
1375 leader = new Leader(leaderActorContext);
1377 // Initial heartbeat
1378 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1380 assertEquals(-1, appendEntries.getLeaderCommit());
1381 assertEquals(0, appendEntries.getEntries().size());
1382 assertEquals(0, appendEntries.getPrevLogIndex());
1384 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1385 leaderActor, AppendEntriesReply.class);
1387 assertEquals(2, appendEntriesReply.getLogLastIndex());
1388 assertEquals(1, appendEntriesReply.getLogLastTerm());
1390 leaderActor.underlyingActor().setBehavior(follower);
1391 leader.handleMessage(followerActor, appendEntriesReply);
1393 leaderActor.underlyingActor().clear();
1394 followerActor.underlyingActor().clear();
1396 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1397 TimeUnit.MILLISECONDS);
1399 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1401 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1403 assertEquals(2, appendEntries.getLeaderCommit());
1404 assertEquals(0, appendEntries.getEntries().size());
1405 assertEquals(2, appendEntries.getPrevLogIndex());
1407 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1409 assertEquals(2, appendEntriesReply.getLogLastIndex());
1410 assertEquals(1, appendEntriesReply.getLogLastTerm());
1412 assertEquals(2, followerActorContext.getCommitIndex());
1418 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1419 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1421 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1422 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1423 new FiniteDuration(1000, TimeUnit.SECONDS));
1425 leaderActorContext.setReplicatedLog(
1426 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1427 long leaderCommitIndex = 2;
1428 leaderActorContext.setCommitIndex(leaderCommitIndex);
1429 leaderActorContext.setLastApplied(leaderCommitIndex);
1431 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1432 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1434 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1436 followerActorContext.setReplicatedLog(
1437 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1438 followerActorContext.setCommitIndex(0);
1439 followerActorContext.setLastApplied(0);
1441 Follower follower = new Follower(followerActorContext);
1442 followerActor.underlyingActor().setBehavior(follower);
1444 leader = new Leader(leaderActorContext);
1446 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1447 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1448 AppendEntriesReply.class);
1450 MessageCollectorActor.clearMessages(followerActor);
1451 MessageCollectorActor.clearMessages(leaderActor);
1453 // Verify initial AppendEntries sent.
1454 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1455 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1456 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1458 leaderActor.underlyingActor().setBehavior(leader);
1460 leader.handleMessage(followerActor, appendEntriesReply);
1462 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1463 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1465 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1466 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1467 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1469 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1470 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1471 appendEntries.getEntries().get(0).getData());
1472 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1473 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1474 appendEntries.getEntries().get(1).getData());
1476 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1477 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1479 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1481 ApplyState applyState = applyStateList.get(0);
1482 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1483 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1484 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1485 applyState.getReplicatedLogEntry().getData());
1487 applyState = applyStateList.get(1);
1488 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1489 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1490 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1491 applyState.getReplicatedLogEntry().getData());
1493 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1494 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1498 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1499 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1501 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1502 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1503 new FiniteDuration(1000, TimeUnit.SECONDS));
1505 leaderActorContext.setReplicatedLog(
1506 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1507 long leaderCommitIndex = 1;
1508 leaderActorContext.setCommitIndex(leaderCommitIndex);
1509 leaderActorContext.setLastApplied(leaderCommitIndex);
1511 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1512 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1514 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1516 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1517 followerActorContext.setCommitIndex(-1);
1518 followerActorContext.setLastApplied(-1);
1520 Follower follower = new Follower(followerActorContext);
1521 followerActor.underlyingActor().setBehavior(follower);
1522 followerActorContext.setCurrentBehavior(follower);
1524 leader = new Leader(leaderActorContext);
1526 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1527 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1528 AppendEntriesReply.class);
1530 MessageCollectorActor.clearMessages(followerActor);
1531 MessageCollectorActor.clearMessages(leaderActor);
1533 // Verify initial AppendEntries sent with the leader's current commit index.
1534 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1535 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1536 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1538 leaderActor.underlyingActor().setBehavior(leader);
1539 leaderActorContext.setCurrentBehavior(leader);
1541 leader.handleMessage(followerActor, appendEntriesReply);
1543 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1544 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1546 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1547 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1548 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1550 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1551 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1552 appendEntries.getEntries().get(0).getData());
1553 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1554 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1555 appendEntries.getEntries().get(1).getData());
1557 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1558 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1560 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1562 ApplyState applyState = applyStateList.get(0);
1563 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1564 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1565 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1566 applyState.getReplicatedLogEntry().getData());
1568 applyState = applyStateList.get(1);
1569 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1570 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1571 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1572 applyState.getReplicatedLogEntry().getData());
1574 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1575 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1579 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1580 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1582 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1583 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1584 new FiniteDuration(1000, TimeUnit.SECONDS));
1586 leaderActorContext.setReplicatedLog(
1587 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1588 long leaderCommitIndex = 1;
1589 leaderActorContext.setCommitIndex(leaderCommitIndex);
1590 leaderActorContext.setLastApplied(leaderCommitIndex);
1592 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1593 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1595 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1597 followerActorContext.setReplicatedLog(
1598 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1599 followerActorContext.setCommitIndex(-1);
1600 followerActorContext.setLastApplied(-1);
1602 Follower follower = new Follower(followerActorContext);
1603 followerActor.underlyingActor().setBehavior(follower);
1604 followerActorContext.setCurrentBehavior(follower);
1606 leader = new Leader(leaderActorContext);
1608 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1609 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1610 AppendEntriesReply.class);
1612 MessageCollectorActor.clearMessages(followerActor);
1613 MessageCollectorActor.clearMessages(leaderActor);
1615 // Verify initial AppendEntries sent with the leader's current commit index.
1616 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1617 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1618 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1620 leaderActor.underlyingActor().setBehavior(leader);
1621 leaderActorContext.setCurrentBehavior(leader);
1623 leader.handleMessage(followerActor, appendEntriesReply);
1625 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1626 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1628 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1629 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1630 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1632 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1633 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1634 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1635 appendEntries.getEntries().get(0).getData());
1636 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1637 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1638 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1639 appendEntries.getEntries().get(1).getData());
1641 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1642 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1644 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1646 ApplyState applyState = applyStateList.get(0);
1647 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1648 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1649 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1650 applyState.getReplicatedLogEntry().getData());
1652 applyState = applyStateList.get(1);
1653 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1654 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1655 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1656 applyState.getReplicatedLogEntry().getData());
1658 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1659 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1660 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1664 public void testHandleAppendEntriesReplyWithNewerTerm() {
1665 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1667 MockRaftActorContext leaderActorContext = createActorContext();
1668 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1669 new FiniteDuration(10000, TimeUnit.SECONDS));
1671 leaderActorContext.setReplicatedLog(
1672 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1674 leader = new Leader(leaderActorContext);
1675 leaderActor.underlyingActor().setBehavior(leader);
1676 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1678 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1679 AppendEntriesReply.class);
1681 assertEquals(false, appendEntriesReply.isSuccess());
1682 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1684 MessageCollectorActor.clearMessages(leaderActor);
1688 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1689 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1691 MockRaftActorContext leaderActorContext = createActorContext();
1692 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1693 new FiniteDuration(10000, TimeUnit.SECONDS));
1695 leaderActorContext.setReplicatedLog(
1696 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1697 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1699 leader = new Leader(leaderActorContext);
1700 leaderActor.underlyingActor().setBehavior(leader);
1701 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1703 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1704 AppendEntriesReply.class);
1706 assertEquals(false, appendEntriesReply.isSuccess());
1707 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1709 MessageCollectorActor.clearMessages(leaderActor);
1713 public void testHandleAppendEntriesReplySuccess() {
1714 logStart("testHandleAppendEntriesReplySuccess");
1716 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1718 leaderActorContext.setReplicatedLog(
1719 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1721 leaderActorContext.setCommitIndex(1);
1722 leaderActorContext.setLastApplied(1);
1723 leaderActorContext.getTermInformation().update(1, "leader");
1725 leader = new Leader(leaderActorContext);
1727 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1729 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1730 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1732 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1734 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1736 assertEquals(RaftState.Leader, raftActorBehavior.state());
1738 assertEquals(2, leaderActorContext.getCommitIndex());
1740 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1741 leaderActor, ApplyJournalEntries.class);
1743 assertEquals(2, leaderActorContext.getLastApplied());
1745 assertEquals(2, applyJournalEntries.getToIndex());
1747 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1750 assertEquals(1,applyStateList.size());
1752 ApplyState applyState = applyStateList.get(0);
1754 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1756 assertEquals(2, followerInfo.getMatchIndex());
1757 assertEquals(3, followerInfo.getNextIndex());
1758 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1759 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1763 public void testHandleAppendEntriesReplyUnknownFollower() {
1764 logStart("testHandleAppendEntriesReplyUnknownFollower");
1766 MockRaftActorContext leaderActorContext = createActorContext();
1768 leader = new Leader(leaderActorContext);
1770 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1772 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1774 assertEquals(RaftState.Leader, raftActorBehavior.state());
1778 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1779 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1781 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1782 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1783 new FiniteDuration(1000, TimeUnit.SECONDS));
1784 // Note: the size here depends on estimate
1785 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1787 leaderActorContext.setReplicatedLog(
1788 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1789 long leaderCommitIndex = 3;
1790 leaderActorContext.setCommitIndex(leaderCommitIndex);
1791 leaderActorContext.setLastApplied(leaderCommitIndex);
1793 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1794 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1795 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1796 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1798 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1800 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1801 followerActorContext.setCommitIndex(-1);
1802 followerActorContext.setLastApplied(-1);
1804 Follower follower = new Follower(followerActorContext);
1805 followerActor.underlyingActor().setBehavior(follower);
1806 followerActorContext.setCurrentBehavior(follower);
1808 leader = new Leader(leaderActorContext);
1810 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1811 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1812 AppendEntriesReply.class);
1814 MessageCollectorActor.clearMessages(followerActor);
1815 MessageCollectorActor.clearMessages(leaderActor);
1817 // Verify initial AppendEntries sent with the leader's current commit index.
1818 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1819 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1820 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1822 leaderActor.underlyingActor().setBehavior(leader);
1823 leaderActorContext.setCurrentBehavior(leader);
1825 leader.handleMessage(followerActor, appendEntriesReply);
1827 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1828 AppendEntries.class, 2);
1829 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1831 appendEntries = appendEntriesList.get(0);
1832 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1833 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1834 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1836 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1837 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1838 appendEntries.getEntries().get(0).getData());
1839 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1840 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1841 appendEntries.getEntries().get(1).getData());
1843 appendEntries = appendEntriesList.get(1);
1844 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1846 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1848 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1849 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1850 appendEntries.getEntries().get(0).getData());
1851 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1852 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1853 appendEntries.getEntries().get(1).getData());
1855 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1856 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1858 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1860 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1861 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1865 public void testHandleRequestVoteReply() {
1866 logStart("testHandleRequestVoteReply");
1868 MockRaftActorContext leaderActorContext = createActorContext();
1870 leader = new Leader(leaderActorContext);
1872 // Should be a no-op.
1873 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1874 new RequestVoteReply(1, true));
1876 assertEquals(RaftState.Leader, raftActorBehavior.state());
1878 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1880 assertEquals(RaftState.Leader, raftActorBehavior.state());
1884 public void testIsolatedLeaderCheckNoFollowers() {
1885 logStart("testIsolatedLeaderCheckNoFollowers");
1887 MockRaftActorContext leaderActorContext = createActorContext();
1889 leader = new Leader(leaderActorContext);
1890 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1891 assertTrue(newBehavior instanceof Leader);
1895 public void testIsolatedLeaderCheckNoVotingFollowers() {
1896 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1898 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1899 Follower follower = new Follower(followerActorContext);
1900 followerActor.underlyingActor().setBehavior(follower);
1902 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1903 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1904 new FiniteDuration(1000, TimeUnit.SECONDS));
1905 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1907 leader = new Leader(leaderActorContext);
1908 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1909 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1910 assertTrue("Expected Leader", newBehavior instanceof Leader);
1913 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1914 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1915 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1917 MockRaftActorContext leaderActorContext = createActorContext();
1919 Map<String, String> peerAddresses = new HashMap<>();
1920 peerAddresses.put("follower-1", followerActor1.path().toString());
1921 peerAddresses.put("follower-2", followerActor2.path().toString());
1923 leaderActorContext.setPeerAddresses(peerAddresses);
1924 leaderActorContext.setRaftPolicy(raftPolicy);
1926 leader = new Leader(leaderActorContext);
1928 leader.markFollowerActive("follower-1");
1929 leader.markFollowerActive("follower-2");
1930 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1931 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1933 // kill 1 follower and verify if that got killed
1934 final TestKit probe = new TestKit(getSystem());
1935 probe.watch(followerActor1);
1936 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1937 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1938 assertEquals(termMsg1.getActor(), followerActor1);
1940 leader.markFollowerInActive("follower-1");
1941 leader.markFollowerActive("follower-2");
1942 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943 assertTrue("Behavior not instance of Leader when majority of followers are active",
1944 newBehavior instanceof Leader);
1946 // kill 2nd follower and leader should change to Isolated leader
1947 followerActor2.tell(PoisonPill.getInstance(), null);
1948 probe.watch(followerActor2);
1949 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1950 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1951 assertEquals(termMsg2.getActor(), followerActor2);
1953 leader.markFollowerInActive("follower-2");
1954 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1958 public void testIsolatedLeaderCheckTwoFollowers() {
1959 logStart("testIsolatedLeaderCheckTwoFollowers");
1961 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1963 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1964 newBehavior instanceof IsolatedLeader);
1968 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1969 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1971 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1973 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1974 newBehavior instanceof Leader);
1978 public void testLaggingFollowerStarvation() {
1979 logStart("testLaggingFollowerStarvation");
1981 String leaderActorId = actorFactory.generateActorId("leader");
1982 String follower1ActorId = actorFactory.generateActorId("follower");
1983 String follower2ActorId = actorFactory.generateActorId("follower");
1985 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1986 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1988 MockRaftActorContext leaderActorContext =
1989 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1991 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1992 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1993 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1995 leaderActorContext.setConfigParams(configParams);
1997 leaderActorContext.setReplicatedLog(
1998 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2000 Map<String, String> peerAddresses = new HashMap<>();
2001 peerAddresses.put(follower1ActorId,
2002 follower1Actor.path().toString());
2003 peerAddresses.put(follower2ActorId,
2004 follower2Actor.path().toString());
2006 leaderActorContext.setPeerAddresses(peerAddresses);
2007 leaderActorContext.getTermInformation().update(1, leaderActorId);
2009 leader = createBehavior(leaderActorContext);
2011 leaderActor.underlyingActor().setBehavior(leader);
2013 for (int i = 1; i < 6; i++) {
2014 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2015 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2016 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2017 assertTrue(newBehavior == leader);
2018 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2021 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2022 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2024 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2025 heartbeats.size() > 1);
2027 // Check if follower-2 got AppendEntries during this time and was not starved
2028 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2030 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2031 appendEntries.size() > 1);
2035 public void testReplicationConsensusWithNonVotingFollower() {
2036 logStart("testReplicationConsensusWithNonVotingFollower");
2038 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2039 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2040 new FiniteDuration(1000, TimeUnit.SECONDS));
2042 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2043 leaderActorContext.setCommitIndex(-1);
2044 leaderActorContext.setLastApplied(-1);
2046 String nonVotingFollowerId = "nonvoting-follower";
2047 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2048 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2050 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2051 VotingState.NON_VOTING);
2053 leader = new Leader(leaderActorContext);
2054 leaderActorContext.setCurrentBehavior(leader);
2056 // Ignore initial heartbeats
2057 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2060 MessageCollectorActor.clearMessages(followerActor);
2061 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2062 MessageCollectorActor.clearMessages(leaderActor);
2064 // Send a Replicate message and wait for AppendEntries.
2065 sendReplicate(leaderActorContext, 0);
2067 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2070 // Send reply only from the voting follower and verify consensus via ApplyState.
2071 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2073 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2075 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2077 MessageCollectorActor.clearMessages(followerActor);
2078 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2079 MessageCollectorActor.clearMessages(leaderActor);
2081 // Send another Replicate message
2082 sendReplicate(leaderActorContext, 1);
2084 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2085 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2086 AppendEntries.class);
2087 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2088 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2090 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2091 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2093 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2095 // Send reply from the voting follower and verify consensus.
2096 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2098 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2102 public void testTransferLeadershipWithFollowerInSync() {
2103 logStart("testTransferLeadershipWithFollowerInSync");
2105 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2106 leaderActorContext.setLastApplied(-1);
2107 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2108 new FiniteDuration(1000, TimeUnit.SECONDS));
2109 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2111 leader = new Leader(leaderActorContext);
2112 leaderActorContext.setCurrentBehavior(leader);
2114 // Initial heartbeat
2115 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2116 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2117 MessageCollectorActor.clearMessages(followerActor);
2119 sendReplicate(leaderActorContext, 0);
2120 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2122 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2123 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2124 MessageCollectorActor.clearMessages(followerActor);
2126 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2127 leader.transferLeadership(mockTransferCohort);
2129 verify(mockTransferCohort, never()).transferComplete();
2130 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2131 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2134 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2135 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2137 // Leader should force an election timeout
2138 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2140 verify(mockTransferCohort).transferComplete();
2144 public void testTransferLeadershipWithEmptyLog() {
2145 logStart("testTransferLeadershipWithEmptyLog");
2147 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2148 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2149 new FiniteDuration(1000, TimeUnit.SECONDS));
2150 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2152 leader = new Leader(leaderActorContext);
2153 leaderActorContext.setCurrentBehavior(leader);
2155 // Initial heartbeat
2156 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2157 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2158 MessageCollectorActor.clearMessages(followerActor);
2160 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2161 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2162 leader.transferLeadership(mockTransferCohort);
2164 verify(mockTransferCohort, never()).transferComplete();
2165 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2166 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2168 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2169 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2171 // Leader should force an election timeout
2172 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2174 verify(mockTransferCohort).transferComplete();
2178 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2179 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2181 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2182 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2183 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2185 leader = new Leader(leaderActorContext);
2186 leaderActorContext.setCurrentBehavior(leader);
2188 // Initial heartbeat
2189 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2190 MessageCollectorActor.clearMessages(followerActor);
2192 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2193 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2194 leader.transferLeadership(mockTransferCohort);
2196 verify(mockTransferCohort, never()).transferComplete();
2198 // Sync up the follower.
2199 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2200 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2201 MessageCollectorActor.clearMessages(followerActor);
2203 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2204 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2205 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2206 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2207 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2209 // Leader should force an election timeout
2210 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2212 verify(mockTransferCohort).transferComplete();
2216 public void testTransferLeadershipWithFollowerSyncTimeout() {
2217 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2219 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2220 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2221 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2222 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2223 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2225 leader = new Leader(leaderActorContext);
2226 leaderActorContext.setCurrentBehavior(leader);
2228 // Initial heartbeat
2229 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2230 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2231 MessageCollectorActor.clearMessages(followerActor);
2233 sendReplicate(leaderActorContext, 0);
2234 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2236 MessageCollectorActor.clearMessages(followerActor);
2238 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2239 leader.transferLeadership(mockTransferCohort);
2241 verify(mockTransferCohort, never()).transferComplete();
2243 // Send heartbeats to time out the transfer.
2244 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2245 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2246 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2247 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2250 verify(mockTransferCohort).abortTransfer();
2251 verify(mockTransferCohort, never()).transferComplete();
2252 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2256 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2257 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2259 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2260 List.of(new SimpleReplicatedLogEntry(0, 1,
2261 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2262 final MockRaftActorContext.MockPayload largePayload =
2263 new MockRaftActorContext.MockPayload("large", serializedSize);
2265 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2266 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2267 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2268 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2269 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2270 leaderActorContext.setCommitIndex(-1);
2271 leaderActorContext.setLastApplied(-1);
2273 leader = new Leader(leaderActorContext);
2274 leaderActorContext.setCurrentBehavior(leader);
2276 // Send initial heartbeat reply so follower is marked active
2277 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2278 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2279 MessageCollectorActor.clearMessages(followerActor);
2281 // Send normal payload first to prime commit index.
2282 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2283 sendReplicate(leaderActorContext, term, 0);
2285 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2286 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2287 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2289 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2290 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2291 MessageCollectorActor.clearMessages(followerActor);
2293 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2294 sendReplicate(leaderActorContext, term, 1, largePayload);
2296 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2297 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2298 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2300 final Identifier slicingId = messageSlice.getIdentifier();
2302 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2303 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2304 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2305 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2306 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2307 MessageCollectorActor.clearMessages(followerActor);
2309 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2311 // Sleep for the heartbeat interval so AppendEntries is sent.
2312 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2313 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2315 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2317 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2318 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2319 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2320 MessageCollectorActor.clearMessages(followerActor);
2322 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2324 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2325 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2326 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2328 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2330 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2332 MessageCollectorActor.clearMessages(followerActor);
2334 // Send another normal payload.
2336 sendReplicate(leaderActorContext, term, 2);
2338 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2339 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2340 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2341 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2345 public void testLargePayloadSlicingExpiration() {
2346 logStart("testLargePayloadSlicingExpiration");
2348 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2349 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2350 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2351 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2352 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2353 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2354 leaderActorContext.setCommitIndex(-1);
2355 leaderActorContext.setLastApplied(-1);
2357 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2358 leader = new Leader(leaderActorContext);
2359 leaderActorContext.setCurrentBehavior(leader);
2361 // Send initial heartbeat reply so follower is marked active
2362 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2363 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2364 MessageCollectorActor.clearMessages(followerActor);
2366 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2367 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2368 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2370 // Sleep for at least 3 * election timeout so the slicing state expires.
2371 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2372 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2373 MessageCollectorActor.clearMessages(followerActor);
2375 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2377 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2378 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2379 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2381 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2382 MessageCollectorActor.clearMessages(followerActor);
2384 // Send an AppendEntriesReply - this should restart the slicing.
2386 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2387 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2389 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2391 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2395 public void testLeaderAddressInAppendEntries() {
2396 logStart("testLeaderAddressInAppendEntries");
2398 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2399 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2400 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2401 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2402 leaderActorContext.setCommitIndex(-1);
2403 leaderActorContext.setLastApplied(-1);
2405 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2406 peerId -> leaderActor.path().toString());
2408 leader = new Leader(leaderActorContext);
2409 leaderActorContext.setCurrentBehavior(leader);
2411 // Initial heartbeat shouldn't have the leader address
2413 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2414 assertFalse(appendEntries.getLeaderAddress().isPresent());
2415 MessageCollectorActor.clearMessages(followerActor);
2417 // Send AppendEntriesReply indicating the follower needs the leader address
2419 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2420 RaftVersions.CURRENT_VERSION));
2422 // Sleep for the heartbeat interval so AppendEntries is sent.
2423 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2424 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2426 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2428 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2429 assertTrue(appendEntries.getLeaderAddress().isPresent());
2430 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2431 MessageCollectorActor.clearMessages(followerActor);
2433 // Send AppendEntriesReply indicating the follower does not need the leader address
2435 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2436 RaftVersions.CURRENT_VERSION));
2438 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2439 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2441 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2443 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2444 assertFalse(appendEntries.getLeaderAddress().isPresent());
2448 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2449 final ActorRef actorRef, final RaftRPC rpc) {
2450 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2451 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2454 private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2456 private final long electionTimeOutIntervalMillis;
2457 private final int snapshotChunkSize;
2459 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2460 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2461 this.snapshotChunkSize = snapshotChunkSize;
2465 public FiniteDuration getElectionTimeOutInterval() {
2466 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2470 public int getSnapshotChunkSize() {
2471 return snapshotChunkSize;