2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.Optional;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.Payload;
69 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
70 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
71 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
76 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
84 static final String FOLLOWER_ID = "follower";
85 public static final String LEADER_ID = "leader";
87 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
93 private Leader leader;
94 private final short payloadVersion = 5;
98 public void tearDown() {
107 public void testHandleMessageForUnknownMessage() {
108 logStart("testHandleMessageForUnknownMessage");
110 leader = new Leader(createActorContext());
112 // handle message should null when it receives an unknown message
113 assertNull(leader.handleMessage(followerActor, "foo"));
117 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
120 MockRaftActorContext actorContext = createActorContextWithFollower();
121 actorContext.setCommitIndex(-1);
122 actorContext.setPayloadVersion(payloadVersion);
125 actorContext.getTermInformation().update(term, "");
127 leader = new Leader(actorContext);
128 actorContext.setCurrentBehavior(leader);
130 // Leader should send an immediate heartbeat with no entries as follower is inactive.
131 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133 assertEquals("getTerm", term, appendEntries.getTerm());
134 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136 assertEquals("Entries size", 0, appendEntries.getEntries().size());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139 // The follower would normally reply - simulate that explicitly here.
140 leader.handleMessage(followerActor, new AppendEntriesReply(
141 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
144 followerActor.underlyingActor().clear();
146 // Sleep for the heartbeat interval so AppendEntries is sent.
147 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
150 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
152 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155 assertEquals("Entries size", 1, appendEntries.getEntries().size());
156 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
162 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163 return sendReplicate(actorContext, 1, index);
166 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
168 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
171 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
172 final Payload payload) {
173 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
174 actorContext.getReplicatedLog().append(newEntry);
175 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
179 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
180 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
182 MockRaftActorContext actorContext = createActorContextWithFollower();
185 actorContext.getTermInformation().update(term, "");
187 leader = new Leader(actorContext);
189 // Leader will send an immediate heartbeat - ignore it.
190 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192 // The follower would normally reply - simulate that explicitly here.
193 long lastIndex = actorContext.getReplicatedLog().lastIndex();
194 leader.handleMessage(followerActor, new AppendEntriesReply(
195 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
196 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
198 followerActor.underlyingActor().clear();
200 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
202 // State should not change
203 assertTrue(raftBehavior instanceof Leader);
205 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
206 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
207 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
208 assertEquals("Entries size", 1, appendEntries.getEntries().size());
209 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
210 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
211 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
212 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
216 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
217 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
219 MockRaftActorContext actorContext = createActorContextWithFollower();
220 actorContext.setCommitIndex(-1);
221 actorContext.setLastApplied(-1);
223 // The raft context is initialized with a couple log entries. However the commitIndex
224 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
225 // committed and applied. Now it regains leadership with a higher term (2).
226 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
227 long newTerm = prevTerm + 1;
228 actorContext.getTermInformation().update(newTerm, "");
230 leader = new Leader(actorContext);
231 actorContext.setCurrentBehavior(leader);
233 // Leader will send an immediate heartbeat - ignore it.
234 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236 // The follower replies with the leader's current last index and term, simulating that it is
237 // up to date with the leader.
238 long lastIndex = actorContext.getReplicatedLog().lastIndex();
239 leader.handleMessage(followerActor, new AppendEntriesReply(
240 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
242 // The commit index should not get updated even though consensus was reached. This is b/c the
243 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
244 // from previous terms by counting replicas".
245 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
247 followerActor.underlyingActor().clear();
249 // Now replicate a new entry with the new term 2.
250 long newIndex = lastIndex + 1;
251 sendReplicate(actorContext, newTerm, newIndex);
253 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
254 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
255 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
256 assertEquals("Entries size", 1, appendEntries.getEntries().size());
257 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
258 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
259 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
261 // The follower replies with success. The leader should now update the commit index to the new index
262 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
263 // prior entries are committed indirectly".
264 leader.handleMessage(followerActor, new AppendEntriesReply(
265 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
267 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
271 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
272 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
274 MockRaftActorContext actorContext = createActorContextWithFollower();
275 actorContext.setRaftPolicy(createRaftPolicy(true, true));
278 actorContext.getTermInformation().update(term, "");
280 leader = new Leader(actorContext);
282 // Leader will send an immediate heartbeat - ignore it.
283 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285 // The follower would normally reply - simulate that explicitly here.
286 long lastIndex = actorContext.getReplicatedLog().lastIndex();
287 leader.handleMessage(followerActor, new AppendEntriesReply(
288 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
289 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291 followerActor.underlyingActor().clear();
293 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
295 // State should not change
296 assertTrue(raftBehavior instanceof Leader);
298 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
299 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
300 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
301 assertEquals("Entries size", 1, appendEntries.getEntries().size());
302 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
303 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
304 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
305 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
309 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
310 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
312 MockRaftActorContext actorContext = createActorContextWithFollower();
313 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
315 public FiniteDuration getHeartBeatInterval() {
316 return FiniteDuration.apply(5, TimeUnit.SECONDS);
321 actorContext.getTermInformation().update(term, "");
323 leader = new Leader(actorContext);
325 // Leader will send an immediate heartbeat - ignore it.
326 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
328 // The follower would normally reply - simulate that explicitly here.
329 long lastIndex = actorContext.getReplicatedLog().lastIndex();
330 leader.handleMessage(followerActor, new AppendEntriesReply(
331 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
334 followerActor.underlyingActor().clear();
336 for (int i = 0; i < 5; i++) {
337 sendReplicate(actorContext, lastIndex + i + 1);
340 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341 // We expect only 1 message to be sent because of two reasons,
342 // - an append entries reply was not received
343 // - the heartbeat interval has not expired
344 // In this scenario if multiple messages are sent they would likely be duplicates
345 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
349 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
350 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
352 MockRaftActorContext actorContext = createActorContextWithFollower();
353 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355 public FiniteDuration getHeartBeatInterval() {
356 return FiniteDuration.apply(5, TimeUnit.SECONDS);
361 actorContext.getTermInformation().update(term, "");
363 leader = new Leader(actorContext);
365 // Leader will send an immediate heartbeat - ignore it.
366 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368 // The follower would normally reply - simulate that explicitly here.
369 long lastIndex = actorContext.getReplicatedLog().lastIndex();
370 leader.handleMessage(followerActor, new AppendEntriesReply(
371 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374 followerActor.underlyingActor().clear();
376 for (int i = 0; i < 3; i++) {
377 sendReplicate(actorContext, lastIndex + i + 1);
378 leader.handleMessage(followerActor, new AppendEntriesReply(
379 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
382 // We are expecting six messages here -- a request to replicate and a consensus-reached message
383 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
384 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
385 for (int i = 0; i < 3; i++) {
386 assertRequestEntry(lastIndex, allMessages, i);
387 assertCommitEntry(lastIndex, allMessages, i);
390 // Now perform another commit, eliciting a request to persist
391 sendReplicate(actorContext, lastIndex + 3 + 1);
392 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393 // This elicits another message for request to replicate
394 assertEquals("The number of request entries collected", 7, allMessages.size());
395 assertRequestEntry(lastIndex, allMessages, 3);
397 sendReplicate(actorContext, lastIndex + 4 + 1);
398 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399 assertEquals("The number of request entries collected", 7, allMessages.size());
402 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
403 final int messageNr) {
404 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
405 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
406 assertEquals(ImmutableList.of(), commitReq.getEntries());
409 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
410 final int messageNr) {
411 final AppendEntries req = allMessages.get(2 * messageNr);
412 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
414 final List<ReplicatedLogEntry> entries = req.getEntries();
415 assertEquals(1, entries.size());
416 assertEquals(messageNr + 2, entries.get(0).getIndex());
420 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
421 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
423 MockRaftActorContext actorContext = createActorContextWithFollower();
424 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
426 public FiniteDuration getHeartBeatInterval() {
427 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
432 actorContext.getTermInformation().update(term, "");
434 leader = new Leader(actorContext);
436 // Leader will send an immediate heartbeat - ignore it.
437 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
439 // The follower would normally reply - simulate that explicitly here.
440 long lastIndex = actorContext.getReplicatedLog().lastIndex();
441 leader.handleMessage(followerActor, new AppendEntriesReply(
442 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
443 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
445 followerActor.underlyingActor().clear();
447 sendReplicate(actorContext, lastIndex + 1);
449 // Wait slightly longer than heartbeat duration
450 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
452 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
454 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
457 assertEquals(1, allMessages.get(0).getEntries().size());
458 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
459 assertEquals(1, allMessages.get(1).getEntries().size());
460 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
465 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
466 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
468 MockRaftActorContext actorContext = createActorContextWithFollower();
469 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
471 public FiniteDuration getHeartBeatInterval() {
472 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
477 actorContext.getTermInformation().update(term, "");
479 leader = new Leader(actorContext);
481 // Leader will send an immediate heartbeat - ignore it.
482 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484 // The follower would normally reply - simulate that explicitly here.
485 long lastIndex = actorContext.getReplicatedLog().lastIndex();
486 leader.handleMessage(followerActor, new AppendEntriesReply(
487 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
490 followerActor.underlyingActor().clear();
492 for (int i = 0; i < 3; i++) {
493 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
494 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
497 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
498 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
502 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
503 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
505 MockRaftActorContext actorContext = createActorContextWithFollower();
506 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
508 public FiniteDuration getHeartBeatInterval() {
509 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
514 actorContext.getTermInformation().update(term, "");
516 leader = new Leader(actorContext);
518 // Leader will send an immediate heartbeat - ignore it.
519 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
521 // The follower would normally reply - simulate that explicitly here.
522 long lastIndex = actorContext.getReplicatedLog().lastIndex();
523 leader.handleMessage(followerActor, new AppendEntriesReply(
524 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
525 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
527 followerActor.underlyingActor().clear();
529 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
530 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
531 sendReplicate(actorContext, lastIndex + 1);
533 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
534 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
536 assertEquals(0, allMessages.get(0).getEntries().size());
537 assertEquals(1, allMessages.get(1).getEntries().size());
542 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
543 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
545 MockRaftActorContext actorContext = createActorContext();
547 leader = new Leader(actorContext);
549 actorContext.setLastApplied(0);
551 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
552 long term = actorContext.getTermInformation().getCurrentTerm();
553 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
554 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
556 actorContext.getReplicatedLog().append(newEntry);
558 final Identifier id = new MockIdentifier("state-id");
559 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
560 new Replicate(leaderActor, id, newEntry, true));
562 // State should not change
563 assertTrue(raftBehavior instanceof Leader);
565 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
567 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
568 // one since lastApplied state is 0.
569 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
570 leaderActor, ApplyState.class);
571 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
573 for (int i = 0; i <= newLogIndex - 1; i++) {
574 ApplyState applyState = applyStateList.get(i);
575 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
576 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
579 ApplyState last = applyStateList.get((int) newLogIndex - 1);
580 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
581 assertEquals("getIdentifier", id, last.getIdentifier());
585 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
586 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
588 final MockRaftActorContext actorContext = createActorContextWithFollower();
590 Map<String, String> leadersSnapshot = new HashMap<>();
591 leadersSnapshot.put("1", "A");
592 leadersSnapshot.put("2", "B");
593 leadersSnapshot.put("3", "C");
596 actorContext.getReplicatedLog().removeFrom(0);
598 final int commitIndex = 3;
599 final int snapshotIndex = 2;
600 final int snapshotTerm = 1;
602 // set the snapshot variables in replicatedlog
603 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
604 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
605 actorContext.setCommitIndex(commitIndex);
606 //set follower timeout to 2 mins, helps during debugging
607 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
609 leader = new Leader(actorContext);
611 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
612 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
614 //update follower timestamp
615 leader.markFollowerActive(FOLLOWER_ID);
617 ByteString bs = toByteString(leadersSnapshot);
618 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
619 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
620 -1, null, null), ByteSource.wrap(bs.toByteArray())));
621 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
622 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
623 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
624 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
626 //send first chunk and no InstallSnapshotReply received yet
628 fts.incrementChunkIndex();
630 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
631 TimeUnit.MILLISECONDS);
633 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
635 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
637 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
639 //InstallSnapshotReply received
640 fts.markSendStatus(true);
642 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
644 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
646 assertEquals(commitIndex, is.getLastIncludedIndex());
650 public void testSendAppendEntriesSnapshotScenario() {
651 logStart("testSendAppendEntriesSnapshotScenario");
653 final MockRaftActorContext actorContext = createActorContextWithFollower();
655 Map<String, String> leadersSnapshot = new HashMap<>();
656 leadersSnapshot.put("1", "A");
657 leadersSnapshot.put("2", "B");
658 leadersSnapshot.put("3", "C");
661 actorContext.getReplicatedLog().removeFrom(0);
663 final int followersLastIndex = 2;
664 final int snapshotIndex = 3;
665 final int newEntryIndex = 4;
666 final int snapshotTerm = 1;
667 final int currentTerm = 2;
669 // set the snapshot variables in replicatedlog
670 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
671 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
672 actorContext.setCommitIndex(followersLastIndex);
674 leader = new Leader(actorContext);
676 // Leader will send an immediate heartbeat - ignore it.
677 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
680 SimpleReplicatedLogEntry entry =
681 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
682 new MockRaftActorContext.MockPayload("D"));
684 actorContext.getReplicatedLog().append(entry);
686 //update follower timestamp
687 leader.markFollowerActive(FOLLOWER_ID);
689 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
690 RaftActorBehavior raftBehavior = leader.handleMessage(
691 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
693 assertTrue(raftBehavior instanceof Leader);
695 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
699 public void testInitiateInstallSnapshot() {
700 logStart("testInitiateInstallSnapshot");
702 MockRaftActorContext actorContext = createActorContextWithFollower();
705 actorContext.getReplicatedLog().removeFrom(0);
707 final int followersLastIndex = 2;
708 final int snapshotIndex = 3;
709 final int newEntryIndex = 4;
710 final int snapshotTerm = 1;
711 final int currentTerm = 2;
713 // set the snapshot variables in replicatedlog
714 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
715 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
716 actorContext.setLastApplied(3);
717 actorContext.setCommitIndex(followersLastIndex);
719 leader = new Leader(actorContext);
721 // Leader will send an immediate heartbeat - ignore it.
722 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
724 // set the snapshot as absent and check if capture-snapshot is invoked.
725 leader.setSnapshotHolder(null);
728 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
729 new MockRaftActorContext.MockPayload("D"));
731 actorContext.getReplicatedLog().append(entry);
733 //update follower timestamp
734 leader.markFollowerActive(FOLLOWER_ID);
736 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
738 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
740 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
742 assertEquals(3, cs.getLastAppliedIndex());
743 assertEquals(1, cs.getLastAppliedTerm());
744 assertEquals(4, cs.getLastIndex());
745 assertEquals(2, cs.getLastTerm());
747 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
748 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
750 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
754 public void testInitiateForceInstallSnapshot() throws Exception {
755 logStart("testInitiateForceInstallSnapshot");
757 MockRaftActorContext actorContext = createActorContextWithFollower();
759 final int followersLastIndex = 2;
760 final int snapshotIndex = -1;
761 final int newEntryIndex = 4;
762 final int snapshotTerm = -1;
763 final int currentTerm = 2;
765 // set the snapshot variables in replicatedlog
766 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
767 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
768 actorContext.setLastApplied(3);
769 actorContext.setCommitIndex(followersLastIndex);
771 actorContext.getReplicatedLog().removeFrom(0);
773 AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
774 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
776 leader = new Leader(actorContext);
777 actorContext.setCurrentBehavior(leader);
779 // Leader will send an immediate heartbeat - ignore it.
780 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
782 // set the snapshot as absent and check if capture-snapshot is invoked.
783 leader.setSnapshotHolder(null);
785 for (int i = 0; i < 4; i++) {
786 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
787 new MockRaftActorContext.MockPayload("X" + i)));
791 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
792 new MockRaftActorContext.MockPayload("D"));
794 actorContext.getReplicatedLog().append(entry);
796 //update follower timestamp
797 leader.markFollowerActive(FOLLOWER_ID);
799 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
800 // installed with a SendInstallSnapshot
801 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
802 RaftVersions.CURRENT_VERSION));
804 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
806 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
807 assertEquals(3, cs.getLastAppliedIndex());
808 assertEquals(1, cs.getLastAppliedTerm());
809 assertEquals(4, cs.getLastIndex());
810 assertEquals(2, cs.getLastTerm());
812 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
813 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
815 MessageCollectorActor.clearMessages(followerActor);
817 // Sending Replicate message should not initiate another capture since the first is in progress.
818 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
819 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
821 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
822 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
823 RaftVersions.CURRENT_VERSION));
824 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
826 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
827 final byte[] bytes = new byte[]{1, 2, 3};
828 installSnapshotStream.get().get().write(bytes);
829 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
830 Runtime.getRuntime().totalMemory());
831 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
833 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
834 MessageCollectorActor.clearMessages(followerActor);
835 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
836 RaftVersions.CURRENT_VERSION));
837 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
842 public void testInstallSnapshot() {
843 logStart("testInstallSnapshot");
845 final MockRaftActorContext actorContext = createActorContextWithFollower();
847 Map<String, String> leadersSnapshot = new HashMap<>();
848 leadersSnapshot.put("1", "A");
849 leadersSnapshot.put("2", "B");
850 leadersSnapshot.put("3", "C");
853 actorContext.getReplicatedLog().removeFrom(0);
855 final int lastAppliedIndex = 3;
856 final int snapshotIndex = 2;
857 final int snapshotTerm = 1;
858 final int currentTerm = 2;
860 // set the snapshot variables in replicatedlog
861 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
862 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
863 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864 actorContext.setCommitIndex(lastAppliedIndex);
865 actorContext.setLastApplied(lastAppliedIndex);
867 leader = new Leader(actorContext);
869 // Initial heartbeat.
870 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
872 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
873 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
875 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
876 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
877 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
879 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
880 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
882 assertTrue(raftBehavior instanceof Leader);
884 // check if installsnapshot gets called with the correct values.
886 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
887 InstallSnapshot.class);
889 assertNotNull(installSnapshot.getData());
890 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
891 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
893 assertEquals(currentTerm, installSnapshot.getTerm());
897 public void testForceInstallSnapshot() {
898 logStart("testForceInstallSnapshot");
900 final MockRaftActorContext actorContext = createActorContextWithFollower();
902 Map<String, String> leadersSnapshot = new HashMap<>();
903 leadersSnapshot.put("1", "A");
904 leadersSnapshot.put("2", "B");
905 leadersSnapshot.put("3", "C");
907 final int lastAppliedIndex = 3;
908 final int snapshotIndex = -1;
909 final int snapshotTerm = -1;
910 final int currentTerm = 2;
912 // set the snapshot variables in replicatedlog
913 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916 actorContext.setCommitIndex(lastAppliedIndex);
917 actorContext.setLastApplied(lastAppliedIndex);
919 leader = new Leader(actorContext);
921 // Initial heartbeat.
922 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
924 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
925 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
927 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
928 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
929 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
931 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
932 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
934 assertTrue(raftBehavior instanceof Leader);
936 // check if installsnapshot gets called with the correct values.
938 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
939 InstallSnapshot.class);
941 assertNotNull(installSnapshot.getData());
942 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
943 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
945 assertEquals(currentTerm, installSnapshot.getTerm());
949 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
950 logStart("testHandleInstallSnapshotReplyLastChunk");
952 MockRaftActorContext actorContext = createActorContextWithFollower();
954 final int commitIndex = 3;
955 final int snapshotIndex = 2;
956 final int snapshotTerm = 1;
957 final int currentTerm = 2;
959 actorContext.setCommitIndex(commitIndex);
961 leader = new Leader(actorContext);
962 actorContext.setCurrentBehavior(leader);
964 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
965 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
967 // Ignore initial heartbeat.
968 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
970 Map<String, String> leadersSnapshot = new HashMap<>();
971 leadersSnapshot.put("1", "A");
972 leadersSnapshot.put("2", "B");
973 leadersSnapshot.put("3", "C");
975 // set the snapshot variables in replicatedlog
977 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
978 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
979 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
981 ByteString bs = toByteString(leadersSnapshot);
982 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
983 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
984 -1, null, null), ByteSource.wrap(bs.toByteArray())));
985 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
986 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
987 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
988 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
989 while (!fts.isLastChunk(fts.getChunkIndex())) {
991 fts.incrementChunkIndex();
995 actorContext.getReplicatedLog().removeFrom(0);
997 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
998 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
1000 assertTrue(raftBehavior instanceof Leader);
1002 assertEquals(1, leader.followerLogSize());
1003 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1005 assertNull(fli.getInstallSnapshotState());
1006 assertEquals(commitIndex, fli.getMatchIndex());
1007 assertEquals(commitIndex + 1, fli.getNextIndex());
1008 assertFalse(leader.hasSnapshot());
1012 public void testSendSnapshotfromInstallSnapshotReply() {
1013 logStart("testSendSnapshotfromInstallSnapshotReply");
1015 MockRaftActorContext actorContext = createActorContextWithFollower();
1017 final int commitIndex = 3;
1018 final int snapshotIndex = 2;
1019 final int snapshotTerm = 1;
1020 final int currentTerm = 2;
1022 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1024 public int getSnapshotChunkSize() {
1028 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1029 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1031 actorContext.setConfigParams(configParams);
1032 actorContext.setCommitIndex(commitIndex);
1034 leader = new Leader(actorContext);
1035 actorContext.setCurrentBehavior(leader);
1037 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1038 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1040 Map<String, String> leadersSnapshot = new HashMap<>();
1041 leadersSnapshot.put("1", "A");
1042 leadersSnapshot.put("2", "B");
1043 leadersSnapshot.put("3", "C");
1045 // set the snapshot variables in replicatedlog
1046 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1047 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1048 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1050 ByteString bs = toByteString(leadersSnapshot);
1051 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1052 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1055 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1057 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1058 InstallSnapshot.class);
1060 assertEquals(1, installSnapshot.getChunkIndex());
1061 assertEquals(3, installSnapshot.getTotalChunks());
1063 followerActor.underlyingActor().clear();
1064 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1067 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 assertEquals(2, installSnapshot.getChunkIndex());
1070 assertEquals(3, installSnapshot.getTotalChunks());
1072 followerActor.underlyingActor().clear();
1073 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1074 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1076 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1078 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1079 followerActor.underlyingActor().clear();
1080 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1083 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1085 assertNull(installSnapshot);
1090 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1091 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1093 MockRaftActorContext actorContext = createActorContextWithFollower();
1095 final int commitIndex = 3;
1096 final int snapshotIndex = 2;
1097 final int snapshotTerm = 1;
1098 final int currentTerm = 2;
1100 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1102 public int getSnapshotChunkSize() {
1107 actorContext.setCommitIndex(commitIndex);
1109 leader = new Leader(actorContext);
1111 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1114 Map<String, String> leadersSnapshot = new HashMap<>();
1115 leadersSnapshot.put("1", "A");
1116 leadersSnapshot.put("2", "B");
1117 leadersSnapshot.put("3", "C");
1119 // set the snapshot variables in replicatedlog
1120 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1124 ByteString bs = toByteString(leadersSnapshot);
1125 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1126 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1129 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1130 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1132 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1133 InstallSnapshot.class);
1135 assertEquals(1, installSnapshot.getChunkIndex());
1136 assertEquals(3, installSnapshot.getTotalChunks());
1138 followerActor.underlyingActor().clear();
1140 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1141 FOLLOWER_ID, -1, false));
1143 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144 TimeUnit.MILLISECONDS);
1146 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1148 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1150 assertEquals(1, installSnapshot.getChunkIndex());
1151 assertEquals(3, installSnapshot.getTotalChunks());
1155 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1156 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1158 MockRaftActorContext actorContext = createActorContextWithFollower();
1160 final int commitIndex = 3;
1161 final int snapshotIndex = 2;
1162 final int snapshotTerm = 1;
1163 final int currentTerm = 2;
1165 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1167 public int getSnapshotChunkSize() {
1172 actorContext.setCommitIndex(commitIndex);
1174 leader = new Leader(actorContext);
1176 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1177 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1179 Map<String, String> leadersSnapshot = new HashMap<>();
1180 leadersSnapshot.put("1", "A");
1181 leadersSnapshot.put("2", "B");
1182 leadersSnapshot.put("3", "C");
1184 // set the snapshot variables in replicatedlog
1185 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1186 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1187 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1189 ByteString bs = toByteString(leadersSnapshot);
1190 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1191 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1194 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1196 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1197 InstallSnapshot.class);
1199 assertEquals(1, installSnapshot.getChunkIndex());
1200 assertEquals(3, installSnapshot.getTotalChunks());
1201 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1202 installSnapshot.getLastChunkHashCode().getAsInt());
1204 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1206 followerActor.underlyingActor().clear();
1208 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1209 FOLLOWER_ID, 1, true));
1211 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1213 assertEquals(2, installSnapshot.getChunkIndex());
1214 assertEquals(3, installSnapshot.getTotalChunks());
1215 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1219 public void testLeaderInstallSnapshotState() throws IOException {
1220 logStart("testLeaderInstallSnapshotState");
1222 Map<String, String> leadersSnapshot = new HashMap<>();
1223 leadersSnapshot.put("1", "A");
1224 leadersSnapshot.put("2", "B");
1225 leadersSnapshot.put("3", "C");
1227 ByteString bs = toByteString(leadersSnapshot);
1228 byte[] barray = bs.toByteArray();
1230 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1231 fts.setSnapshotBytes(ByteSource.wrap(barray));
1233 assertEquals(bs.size(), barray.length);
1236 for (int i = 0; i < barray.length; i = i + 50) {
1237 int length = i + 50;
1240 if (i + 50 > barray.length) {
1241 length = barray.length;
1244 byte[] chunk = fts.getNextChunk();
1245 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1246 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1248 fts.markSendStatus(true);
1249 if (!fts.isLastChunk(chunkIndex)) {
1250 fts.incrementChunkIndex();
1254 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1259 protected Leader createBehavior(final RaftActorContext actorContext) {
1260 return new Leader(actorContext);
1264 protected MockRaftActorContext createActorContext() {
1265 return createActorContext(leaderActor);
1269 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1270 return createActorContext(LEADER_ID, actorRef);
1273 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1274 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1276 configParams.setElectionTimeoutFactor(100000);
1277 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1278 context.setConfigParams(configParams);
1279 context.setPayloadVersion(payloadVersion);
1283 private MockRaftActorContext createActorContextWithFollower() {
1284 MockRaftActorContext actorContext = createActorContext();
1285 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1286 followerActor.path().toString()).build());
1287 return actorContext;
1290 private MockRaftActorContext createFollowerActorContextWithLeader() {
1291 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1293 followerConfig.setElectionTimeoutFactor(10000);
1294 followerActorContext.setConfigParams(followerConfig);
1295 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296 return followerActorContext;
1300 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1301 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1303 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1305 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1307 Follower follower = new Follower(followerActorContext);
1308 followerActor.underlyingActor().setBehavior(follower);
1309 followerActorContext.setCurrentBehavior(follower);
1311 Map<String, String> peerAddresses = new HashMap<>();
1312 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1314 leaderActorContext.setPeerAddresses(peerAddresses);
1316 leaderActorContext.getReplicatedLog().removeFrom(0);
1319 leaderActorContext.setReplicatedLog(
1320 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322 leaderActorContext.setCommitIndex(1);
1324 followerActorContext.getReplicatedLog().removeFrom(0);
1326 // follower too has the exact same log entries and has the same commit index
1327 followerActorContext.setReplicatedLog(
1328 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1330 followerActorContext.setCommitIndex(1);
1332 leader = new Leader(leaderActorContext);
1333 leaderActorContext.setCurrentBehavior(leader);
1335 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1337 assertEquals(-1, appendEntries.getLeaderCommit());
1338 assertEquals(0, appendEntries.getEntries().size());
1339 assertEquals(0, appendEntries.getPrevLogIndex());
1341 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1342 leaderActor, AppendEntriesReply.class);
1344 assertEquals(2, appendEntriesReply.getLogLastIndex());
1345 assertEquals(1, appendEntriesReply.getLogLastTerm());
1347 // follower returns its next index
1348 assertEquals(2, appendEntriesReply.getLogLastIndex());
1349 assertEquals(1, appendEntriesReply.getLogLastTerm());
1355 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1356 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1358 final MockRaftActorContext leaderActorContext = createActorContext();
1360 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1361 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1363 Follower follower = new Follower(followerActorContext);
1364 followerActor.underlyingActor().setBehavior(follower);
1365 followerActorContext.setCurrentBehavior(follower);
1367 Map<String, String> leaderPeerAddresses = new HashMap<>();
1368 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1370 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1372 leaderActorContext.getReplicatedLog().removeFrom(0);
1374 leaderActorContext.setReplicatedLog(
1375 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1377 leaderActorContext.setCommitIndex(1);
1379 followerActorContext.getReplicatedLog().removeFrom(0);
1381 followerActorContext.setReplicatedLog(
1382 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1384 // follower has the same log entries but its commit index > leaders commit index
1385 followerActorContext.setCommitIndex(2);
1387 leader = new Leader(leaderActorContext);
1389 // Initial heartbeat
1390 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392 assertEquals(-1, appendEntries.getLeaderCommit());
1393 assertEquals(0, appendEntries.getEntries().size());
1394 assertEquals(0, appendEntries.getPrevLogIndex());
1396 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1397 leaderActor, AppendEntriesReply.class);
1399 assertEquals(2, appendEntriesReply.getLogLastIndex());
1400 assertEquals(1, appendEntriesReply.getLogLastTerm());
1402 leaderActor.underlyingActor().setBehavior(follower);
1403 leader.handleMessage(followerActor, appendEntriesReply);
1405 leaderActor.underlyingActor().clear();
1406 followerActor.underlyingActor().clear();
1408 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1409 TimeUnit.MILLISECONDS);
1411 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1413 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415 assertEquals(2, appendEntries.getLeaderCommit());
1416 assertEquals(0, appendEntries.getEntries().size());
1417 assertEquals(2, appendEntries.getPrevLogIndex());
1419 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1421 assertEquals(2, appendEntriesReply.getLogLastIndex());
1422 assertEquals(1, appendEntriesReply.getLogLastTerm());
1424 assertEquals(2, followerActorContext.getCommitIndex());
1430 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1431 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1433 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1434 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435 new FiniteDuration(1000, TimeUnit.SECONDS));
1437 leaderActorContext.setReplicatedLog(
1438 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1439 long leaderCommitIndex = 2;
1440 leaderActorContext.setCommitIndex(leaderCommitIndex);
1441 leaderActorContext.setLastApplied(leaderCommitIndex);
1443 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1444 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1446 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1448 followerActorContext.setReplicatedLog(
1449 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1450 followerActorContext.setCommitIndex(0);
1451 followerActorContext.setLastApplied(0);
1453 Follower follower = new Follower(followerActorContext);
1454 followerActor.underlyingActor().setBehavior(follower);
1456 leader = new Leader(leaderActorContext);
1458 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1459 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1460 AppendEntriesReply.class);
1462 MessageCollectorActor.clearMessages(followerActor);
1463 MessageCollectorActor.clearMessages(leaderActor);
1465 // Verify initial AppendEntries sent.
1466 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1467 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1468 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1470 leaderActor.underlyingActor().setBehavior(leader);
1472 leader.handleMessage(followerActor, appendEntriesReply);
1474 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1475 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1477 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1478 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1481 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1482 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1483 appendEntries.getEntries().get(0).getData());
1484 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1485 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1486 appendEntries.getEntries().get(1).getData());
1488 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1489 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1491 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1493 ApplyState applyState = applyStateList.get(0);
1494 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1495 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1496 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1497 applyState.getReplicatedLogEntry().getData());
1499 applyState = applyStateList.get(1);
1500 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1501 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1502 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1503 applyState.getReplicatedLogEntry().getData());
1505 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1506 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1510 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1511 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1513 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1514 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1515 new FiniteDuration(1000, TimeUnit.SECONDS));
1517 leaderActorContext.setReplicatedLog(
1518 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1519 long leaderCommitIndex = 1;
1520 leaderActorContext.setCommitIndex(leaderCommitIndex);
1521 leaderActorContext.setLastApplied(leaderCommitIndex);
1523 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1524 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1526 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1528 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1529 followerActorContext.setCommitIndex(-1);
1530 followerActorContext.setLastApplied(-1);
1532 Follower follower = new Follower(followerActorContext);
1533 followerActor.underlyingActor().setBehavior(follower);
1534 followerActorContext.setCurrentBehavior(follower);
1536 leader = new Leader(leaderActorContext);
1538 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1539 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1540 AppendEntriesReply.class);
1542 MessageCollectorActor.clearMessages(followerActor);
1543 MessageCollectorActor.clearMessages(leaderActor);
1545 // Verify initial AppendEntries sent with the leader's current commit index.
1546 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1547 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1548 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1550 leaderActor.underlyingActor().setBehavior(leader);
1551 leaderActorContext.setCurrentBehavior(leader);
1553 leader.handleMessage(followerActor, appendEntriesReply);
1555 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1556 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1558 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1559 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1560 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1562 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1563 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1564 appendEntries.getEntries().get(0).getData());
1565 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1566 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1567 appendEntries.getEntries().get(1).getData());
1569 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1570 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1572 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1574 ApplyState applyState = applyStateList.get(0);
1575 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1576 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1577 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1578 applyState.getReplicatedLogEntry().getData());
1580 applyState = applyStateList.get(1);
1581 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1582 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1583 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1584 applyState.getReplicatedLogEntry().getData());
1586 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1587 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1591 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1592 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1594 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1595 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1596 new FiniteDuration(1000, TimeUnit.SECONDS));
1598 leaderActorContext.setReplicatedLog(
1599 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1600 long leaderCommitIndex = 1;
1601 leaderActorContext.setCommitIndex(leaderCommitIndex);
1602 leaderActorContext.setLastApplied(leaderCommitIndex);
1604 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1605 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1607 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1609 followerActorContext.setReplicatedLog(
1610 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1611 followerActorContext.setCommitIndex(-1);
1612 followerActorContext.setLastApplied(-1);
1614 Follower follower = new Follower(followerActorContext);
1615 followerActor.underlyingActor().setBehavior(follower);
1616 followerActorContext.setCurrentBehavior(follower);
1618 leader = new Leader(leaderActorContext);
1620 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1621 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1622 AppendEntriesReply.class);
1624 MessageCollectorActor.clearMessages(followerActor);
1625 MessageCollectorActor.clearMessages(leaderActor);
1627 // Verify initial AppendEntries sent with the leader's current commit index.
1628 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1629 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1630 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1632 leaderActor.underlyingActor().setBehavior(leader);
1633 leaderActorContext.setCurrentBehavior(leader);
1635 leader.handleMessage(followerActor, appendEntriesReply);
1637 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1638 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1640 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1641 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1642 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1644 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1645 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1646 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1647 appendEntries.getEntries().get(0).getData());
1648 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1649 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1650 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1651 appendEntries.getEntries().get(1).getData());
1653 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1654 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1656 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1658 ApplyState applyState = applyStateList.get(0);
1659 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1660 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1661 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1662 applyState.getReplicatedLogEntry().getData());
1664 applyState = applyStateList.get(1);
1665 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1666 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1667 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1668 applyState.getReplicatedLogEntry().getData());
1670 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1671 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1672 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1676 public void testHandleAppendEntriesReplyWithNewerTerm() {
1677 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1679 MockRaftActorContext leaderActorContext = createActorContext();
1680 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1681 new FiniteDuration(10000, TimeUnit.SECONDS));
1683 leaderActorContext.setReplicatedLog(
1684 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1686 leader = new Leader(leaderActorContext);
1687 leaderActor.underlyingActor().setBehavior(leader);
1688 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1690 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1691 AppendEntriesReply.class);
1693 assertEquals(false, appendEntriesReply.isSuccess());
1694 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1696 MessageCollectorActor.clearMessages(leaderActor);
1700 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1701 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1703 MockRaftActorContext leaderActorContext = createActorContext();
1704 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1705 new FiniteDuration(10000, TimeUnit.SECONDS));
1707 leaderActorContext.setReplicatedLog(
1708 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1709 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1711 leader = new Leader(leaderActorContext);
1712 leaderActor.underlyingActor().setBehavior(leader);
1713 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1715 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1716 AppendEntriesReply.class);
1718 assertEquals(false, appendEntriesReply.isSuccess());
1719 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1721 MessageCollectorActor.clearMessages(leaderActor);
1725 public void testHandleAppendEntriesReplySuccess() {
1726 logStart("testHandleAppendEntriesReplySuccess");
1728 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1730 leaderActorContext.setReplicatedLog(
1731 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1733 leaderActorContext.setCommitIndex(1);
1734 leaderActorContext.setLastApplied(1);
1735 leaderActorContext.getTermInformation().update(1, "leader");
1737 leader = new Leader(leaderActorContext);
1739 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1741 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1742 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1744 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1746 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1748 assertEquals(RaftState.Leader, raftActorBehavior.state());
1750 assertEquals(2, leaderActorContext.getCommitIndex());
1752 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1753 leaderActor, ApplyJournalEntries.class);
1755 assertEquals(2, leaderActorContext.getLastApplied());
1757 assertEquals(2, applyJournalEntries.getToIndex());
1759 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1762 assertEquals(1,applyStateList.size());
1764 ApplyState applyState = applyStateList.get(0);
1766 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1768 assertEquals(2, followerInfo.getMatchIndex());
1769 assertEquals(3, followerInfo.getNextIndex());
1770 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1771 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1775 public void testHandleAppendEntriesReplyUnknownFollower() {
1776 logStart("testHandleAppendEntriesReplyUnknownFollower");
1778 MockRaftActorContext leaderActorContext = createActorContext();
1780 leader = new Leader(leaderActorContext);
1782 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1784 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1786 assertEquals(RaftState.Leader, raftActorBehavior.state());
1790 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1791 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1793 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1794 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1795 new FiniteDuration(1000, TimeUnit.SECONDS));
1796 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1798 leaderActorContext.setReplicatedLog(
1799 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1800 long leaderCommitIndex = 3;
1801 leaderActorContext.setCommitIndex(leaderCommitIndex);
1802 leaderActorContext.setLastApplied(leaderCommitIndex);
1804 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1805 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1806 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1807 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1809 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1811 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1812 followerActorContext.setCommitIndex(-1);
1813 followerActorContext.setLastApplied(-1);
1815 Follower follower = new Follower(followerActorContext);
1816 followerActor.underlyingActor().setBehavior(follower);
1817 followerActorContext.setCurrentBehavior(follower);
1819 leader = new Leader(leaderActorContext);
1821 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1822 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1823 AppendEntriesReply.class);
1825 MessageCollectorActor.clearMessages(followerActor);
1826 MessageCollectorActor.clearMessages(leaderActor);
1828 // Verify initial AppendEntries sent with the leader's current commit index.
1829 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1830 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1831 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1833 leaderActor.underlyingActor().setBehavior(leader);
1834 leaderActorContext.setCurrentBehavior(leader);
1836 leader.handleMessage(followerActor, appendEntriesReply);
1838 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1839 AppendEntries.class, 2);
1840 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1842 appendEntries = appendEntriesList.get(0);
1843 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1844 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1845 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1847 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1848 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1849 appendEntries.getEntries().get(0).getData());
1850 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1851 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1852 appendEntries.getEntries().get(1).getData());
1854 appendEntries = appendEntriesList.get(1);
1855 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1856 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1857 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1859 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1860 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1861 appendEntries.getEntries().get(0).getData());
1862 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1863 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1864 appendEntries.getEntries().get(1).getData());
1866 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1867 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1869 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1871 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1872 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1876 public void testHandleRequestVoteReply() {
1877 logStart("testHandleRequestVoteReply");
1879 MockRaftActorContext leaderActorContext = createActorContext();
1881 leader = new Leader(leaderActorContext);
1883 // Should be a no-op.
1884 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1885 new RequestVoteReply(1, true));
1887 assertEquals(RaftState.Leader, raftActorBehavior.state());
1889 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1891 assertEquals(RaftState.Leader, raftActorBehavior.state());
1895 public void testIsolatedLeaderCheckNoFollowers() {
1896 logStart("testIsolatedLeaderCheckNoFollowers");
1898 MockRaftActorContext leaderActorContext = createActorContext();
1900 leader = new Leader(leaderActorContext);
1901 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1902 assertTrue(newBehavior instanceof Leader);
1906 public void testIsolatedLeaderCheckNoVotingFollowers() {
1907 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1909 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1910 Follower follower = new Follower(followerActorContext);
1911 followerActor.underlyingActor().setBehavior(follower);
1913 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1914 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1915 new FiniteDuration(1000, TimeUnit.SECONDS));
1916 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1918 leader = new Leader(leaderActorContext);
1919 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1920 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921 assertTrue("Expected Leader", newBehavior instanceof Leader);
1924 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1925 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1926 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1928 MockRaftActorContext leaderActorContext = createActorContext();
1930 Map<String, String> peerAddresses = new HashMap<>();
1931 peerAddresses.put("follower-1", followerActor1.path().toString());
1932 peerAddresses.put("follower-2", followerActor2.path().toString());
1934 leaderActorContext.setPeerAddresses(peerAddresses);
1935 leaderActorContext.setRaftPolicy(raftPolicy);
1937 leader = new Leader(leaderActorContext);
1939 leader.markFollowerActive("follower-1");
1940 leader.markFollowerActive("follower-2");
1941 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1942 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1944 // kill 1 follower and verify if that got killed
1945 final TestKit probe = new TestKit(getSystem());
1946 probe.watch(followerActor1);
1947 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1948 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1949 assertEquals(termMsg1.getActor(), followerActor1);
1951 leader.markFollowerInActive("follower-1");
1952 leader.markFollowerActive("follower-2");
1953 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1954 assertTrue("Behavior not instance of Leader when majority of followers are active",
1955 newBehavior instanceof Leader);
1957 // kill 2nd follower and leader should change to Isolated leader
1958 followerActor2.tell(PoisonPill.getInstance(), null);
1959 probe.watch(followerActor2);
1960 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1961 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1962 assertEquals(termMsg2.getActor(), followerActor2);
1964 leader.markFollowerInActive("follower-2");
1965 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1969 public void testIsolatedLeaderCheckTwoFollowers() {
1970 logStart("testIsolatedLeaderCheckTwoFollowers");
1972 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1974 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1975 newBehavior instanceof IsolatedLeader);
1979 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1980 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1982 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1984 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1985 newBehavior instanceof Leader);
1989 public void testLaggingFollowerStarvation() {
1990 logStart("testLaggingFollowerStarvation");
1992 String leaderActorId = actorFactory.generateActorId("leader");
1993 String follower1ActorId = actorFactory.generateActorId("follower");
1994 String follower2ActorId = actorFactory.generateActorId("follower");
1996 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1997 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1999 MockRaftActorContext leaderActorContext =
2000 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2002 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2003 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2004 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2006 leaderActorContext.setConfigParams(configParams);
2008 leaderActorContext.setReplicatedLog(
2009 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2011 Map<String, String> peerAddresses = new HashMap<>();
2012 peerAddresses.put(follower1ActorId,
2013 follower1Actor.path().toString());
2014 peerAddresses.put(follower2ActorId,
2015 follower2Actor.path().toString());
2017 leaderActorContext.setPeerAddresses(peerAddresses);
2018 leaderActorContext.getTermInformation().update(1, leaderActorId);
2020 leader = createBehavior(leaderActorContext);
2022 leaderActor.underlyingActor().setBehavior(leader);
2024 for (int i = 1; i < 6; i++) {
2025 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2026 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2027 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2028 assertTrue(newBehavior == leader);
2029 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2032 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2033 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2035 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2036 heartbeats.size() > 1);
2038 // Check if follower-2 got AppendEntries during this time and was not starved
2039 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2041 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2042 appendEntries.size() > 1);
2046 public void testReplicationConsensusWithNonVotingFollower() {
2047 logStart("testReplicationConsensusWithNonVotingFollower");
2049 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051 new FiniteDuration(1000, TimeUnit.SECONDS));
2053 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054 leaderActorContext.setCommitIndex(-1);
2055 leaderActorContext.setLastApplied(-1);
2057 String nonVotingFollowerId = "nonvoting-follower";
2058 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2059 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2061 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2062 VotingState.NON_VOTING);
2064 leader = new Leader(leaderActorContext);
2065 leaderActorContext.setCurrentBehavior(leader);
2067 // Ignore initial heartbeats
2068 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2069 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2071 MessageCollectorActor.clearMessages(followerActor);
2072 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2073 MessageCollectorActor.clearMessages(leaderActor);
2075 // Send a Replicate message and wait for AppendEntries.
2076 sendReplicate(leaderActorContext, 0);
2078 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2079 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2081 // Send reply only from the voting follower and verify consensus via ApplyState.
2082 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2084 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2086 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2088 MessageCollectorActor.clearMessages(followerActor);
2089 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2090 MessageCollectorActor.clearMessages(leaderActor);
2092 // Send another Replicate message
2093 sendReplicate(leaderActorContext, 1);
2095 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2096 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2097 AppendEntries.class);
2098 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2099 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2101 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2102 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2104 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2106 // Send reply from the voting follower and verify consensus.
2107 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2109 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2113 public void testTransferLeadershipWithFollowerInSync() {
2114 logStart("testTransferLeadershipWithFollowerInSync");
2116 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2117 leaderActorContext.setLastApplied(-1);
2118 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2119 new FiniteDuration(1000, TimeUnit.SECONDS));
2120 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2122 leader = new Leader(leaderActorContext);
2123 leaderActorContext.setCurrentBehavior(leader);
2125 // Initial heartbeat
2126 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2127 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2128 MessageCollectorActor.clearMessages(followerActor);
2130 sendReplicate(leaderActorContext, 0);
2131 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2134 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2135 MessageCollectorActor.clearMessages(followerActor);
2137 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2138 leader.transferLeadership(mockTransferCohort);
2140 verify(mockTransferCohort, never()).transferComplete();
2141 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2142 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2143 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2145 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2146 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2148 // Leader should force an election timeout
2149 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151 verify(mockTransferCohort).transferComplete();
2155 public void testTransferLeadershipWithEmptyLog() {
2156 logStart("testTransferLeadershipWithEmptyLog");
2158 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2159 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2160 new FiniteDuration(1000, TimeUnit.SECONDS));
2161 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2163 leader = new Leader(leaderActorContext);
2164 leaderActorContext.setCurrentBehavior(leader);
2166 // Initial heartbeat
2167 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2168 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2169 MessageCollectorActor.clearMessages(followerActor);
2171 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2172 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2173 leader.transferLeadership(mockTransferCohort);
2175 verify(mockTransferCohort, never()).transferComplete();
2176 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2177 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2179 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2180 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2182 // Leader should force an election timeout
2183 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2185 verify(mockTransferCohort).transferComplete();
2189 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2190 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2192 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2193 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2194 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2196 leader = new Leader(leaderActorContext);
2197 leaderActorContext.setCurrentBehavior(leader);
2199 // Initial heartbeat
2200 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2201 MessageCollectorActor.clearMessages(followerActor);
2203 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2204 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2205 leader.transferLeadership(mockTransferCohort);
2207 verify(mockTransferCohort, never()).transferComplete();
2209 // Sync up the follower.
2210 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2211 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2212 MessageCollectorActor.clearMessages(followerActor);
2214 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2215 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2216 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2217 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2218 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2220 // Leader should force an election timeout
2221 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2223 verify(mockTransferCohort).transferComplete();
2227 public void testTransferLeadershipWithFollowerSyncTimeout() {
2228 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2230 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2231 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2232 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2233 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2234 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2236 leader = new Leader(leaderActorContext);
2237 leaderActorContext.setCurrentBehavior(leader);
2239 // Initial heartbeat
2240 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2241 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2242 MessageCollectorActor.clearMessages(followerActor);
2244 sendReplicate(leaderActorContext, 0);
2245 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2247 MessageCollectorActor.clearMessages(followerActor);
2249 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2250 leader.transferLeadership(mockTransferCohort);
2252 verify(mockTransferCohort, never()).transferComplete();
2254 // Send heartbeats to time out the transfer.
2255 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2256 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2257 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2258 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2261 verify(mockTransferCohort).abortTransfer();
2262 verify(mockTransferCohort, never()).transferComplete();
2263 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2267 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2268 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2270 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2271 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2272 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2273 final MockRaftActorContext.MockPayload largePayload =
2274 new MockRaftActorContext.MockPayload("large", serializedSize);
2276 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2277 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2278 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2279 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2280 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2281 leaderActorContext.setCommitIndex(-1);
2282 leaderActorContext.setLastApplied(-1);
2284 leader = new Leader(leaderActorContext);
2285 leaderActorContext.setCurrentBehavior(leader);
2287 // Send initial heartbeat reply so follower is marked active
2288 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2289 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2290 MessageCollectorActor.clearMessages(followerActor);
2292 // Send normal payload first to prime commit index.
2293 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2294 sendReplicate(leaderActorContext, term, 0);
2296 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2297 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2298 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2300 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2301 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2302 MessageCollectorActor.clearMessages(followerActor);
2304 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2305 sendReplicate(leaderActorContext, term, 1, largePayload);
2307 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2308 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2309 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2311 final Identifier slicingId = messageSlice.getIdentifier();
2313 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2314 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2315 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2316 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2317 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2318 MessageCollectorActor.clearMessages(followerActor);
2320 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2322 // Sleep for the heartbeat interval so AppendEntries is sent.
2323 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2324 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2326 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2328 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2329 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2330 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2331 MessageCollectorActor.clearMessages(followerActor);
2333 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2335 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2336 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2337 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2339 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2341 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2343 MessageCollectorActor.clearMessages(followerActor);
2345 // Send another normal payload.
2347 sendReplicate(leaderActorContext, term, 2);
2349 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2350 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2351 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2352 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2356 public void testLargePayloadSlicingExpiration() {
2357 logStart("testLargePayloadSlicingExpiration");
2359 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2360 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2361 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2362 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2363 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2364 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2365 leaderActorContext.setCommitIndex(-1);
2366 leaderActorContext.setLastApplied(-1);
2368 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2369 leader = new Leader(leaderActorContext);
2370 leaderActorContext.setCurrentBehavior(leader);
2372 // Send initial heartbeat reply so follower is marked active
2373 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2374 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2375 MessageCollectorActor.clearMessages(followerActor);
2377 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2378 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2379 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2381 // Sleep for at least 3 * election timeout so the slicing state expires.
2382 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2383 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2384 MessageCollectorActor.clearMessages(followerActor);
2386 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2388 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2389 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2390 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2392 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2393 MessageCollectorActor.clearMessages(followerActor);
2395 // Send an AppendEntriesReply - this should restart the slicing.
2397 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2398 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2400 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2402 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2406 public void testLeaderAddressInAppendEntries() {
2407 logStart("testLeaderAddressInAppendEntries");
2409 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2410 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2411 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2412 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2413 leaderActorContext.setCommitIndex(-1);
2414 leaderActorContext.setLastApplied(-1);
2416 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2417 peerId -> leaderActor.path().toString());
2419 leader = new Leader(leaderActorContext);
2420 leaderActorContext.setCurrentBehavior(leader);
2422 // Initial heartbeat shouldn't have the leader address
2424 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2425 assertFalse(appendEntries.getLeaderAddress().isPresent());
2426 MessageCollectorActor.clearMessages(followerActor);
2428 // Send AppendEntriesReply indicating the follower needs the leader address
2430 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2431 RaftVersions.CURRENT_VERSION));
2433 // Sleep for the heartbeat interval so AppendEntries is sent.
2434 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2435 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2437 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2439 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2440 assertTrue(appendEntries.getLeaderAddress().isPresent());
2441 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2442 MessageCollectorActor.clearMessages(followerActor);
2444 // Send AppendEntriesReply indicating the follower does not need the leader address
2446 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2447 RaftVersions.CURRENT_VERSION));
2449 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2450 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2452 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2454 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2455 assertFalse(appendEntries.getLeaderAddress().isPresent());
2459 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2460 final ActorRef actorRef, final RaftRPC rpc) {
2461 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2462 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2465 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2467 private final long electionTimeOutIntervalMillis;
2468 private final int snapshotChunkSize;
2470 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2471 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2472 this.snapshotChunkSize = snapshotChunkSize;
2476 public FiniteDuration getElectionTimeOutInterval() {
2477 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2481 public int getSnapshotChunkSize() {
2482 return snapshotChunkSize;