2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.Optional;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.Payload;
69 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
70 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
71 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
76 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
84 static final String FOLLOWER_ID = "follower";
85 public static final String LEADER_ID = "leader";
87 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
93 private Leader leader;
94 private final short payloadVersion = 5;
98 public void tearDown() {
107 public void testHandleMessageForUnknownMessage() {
108 logStart("testHandleMessageForUnknownMessage");
110 leader = new Leader(createActorContext());
112 // handle message should null when it receives an unknown message
113 assertNull(leader.handleMessage(followerActor, "foo"));
117 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
120 MockRaftActorContext actorContext = createActorContextWithFollower();
121 actorContext.setCommitIndex(-1);
122 actorContext.setPayloadVersion(payloadVersion);
125 actorContext.getTermInformation().update(term, "");
127 leader = new Leader(actorContext);
128 actorContext.setCurrentBehavior(leader);
130 // Leader should send an immediate heartbeat with no entries as follower is inactive.
131 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133 assertEquals("getTerm", term, appendEntries.getTerm());
134 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136 assertEquals("Entries size", 0, appendEntries.getEntries().size());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139 // The follower would normally reply - simulate that explicitly here.
140 leader.handleMessage(followerActor, new AppendEntriesReply(
141 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
144 followerActor.underlyingActor().clear();
146 // Sleep for the heartbeat interval so AppendEntries is sent.
147 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
150 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
152 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155 assertEquals("Entries size", 1, appendEntries.getEntries().size());
156 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
162 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163 return sendReplicate(actorContext, 1, index);
166 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
168 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
171 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
172 final Payload payload) {
173 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
174 actorContext.getReplicatedLog().append(newEntry);
175 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
179 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
180 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
182 MockRaftActorContext actorContext = createActorContextWithFollower();
185 actorContext.getTermInformation().update(term, "");
187 leader = new Leader(actorContext);
189 // Leader will send an immediate heartbeat - ignore it.
190 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192 // The follower would normally reply - simulate that explicitly here.
193 long lastIndex = actorContext.getReplicatedLog().lastIndex();
194 leader.handleMessage(followerActor, new AppendEntriesReply(
195 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
196 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
198 followerActor.underlyingActor().clear();
200 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
202 // State should not change
203 assertTrue(raftBehavior instanceof Leader);
205 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
206 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
207 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
208 assertEquals("Entries size", 1, appendEntries.getEntries().size());
209 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
210 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
211 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
212 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
216 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
217 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
219 MockRaftActorContext actorContext = createActorContextWithFollower();
220 actorContext.setCommitIndex(-1);
221 actorContext.setLastApplied(-1);
223 // The raft context is initialized with a couple log entries. However the commitIndex
224 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
225 // committed and applied. Now it regains leadership with a higher term (2).
226 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
227 long newTerm = prevTerm + 1;
228 actorContext.getTermInformation().update(newTerm, "");
230 leader = new Leader(actorContext);
231 actorContext.setCurrentBehavior(leader);
233 // Leader will send an immediate heartbeat - ignore it.
234 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236 // The follower replies with the leader's current last index and term, simulating that it is
237 // up to date with the leader.
238 long lastIndex = actorContext.getReplicatedLog().lastIndex();
239 leader.handleMessage(followerActor, new AppendEntriesReply(
240 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
242 // The commit index should not get updated even though consensus was reached. This is b/c the
243 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
244 // from previous terms by counting replicas".
245 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
247 followerActor.underlyingActor().clear();
249 // Now replicate a new entry with the new term 2.
250 long newIndex = lastIndex + 1;
251 sendReplicate(actorContext, newTerm, newIndex);
253 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
254 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
255 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
256 assertEquals("Entries size", 1, appendEntries.getEntries().size());
257 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
258 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
259 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
261 // The follower replies with success. The leader should now update the commit index to the new index
262 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
263 // prior entries are committed indirectly".
264 leader.handleMessage(followerActor, new AppendEntriesReply(
265 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
267 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
271 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
272 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
274 MockRaftActorContext actorContext = createActorContextWithFollower();
275 actorContext.setRaftPolicy(createRaftPolicy(true, true));
278 actorContext.getTermInformation().update(term, "");
280 leader = new Leader(actorContext);
282 // Leader will send an immediate heartbeat - ignore it.
283 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285 // The follower would normally reply - simulate that explicitly here.
286 long lastIndex = actorContext.getReplicatedLog().lastIndex();
287 leader.handleMessage(followerActor, new AppendEntriesReply(
288 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
289 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291 followerActor.underlyingActor().clear();
293 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
295 // State should not change
296 assertTrue(raftBehavior instanceof Leader);
298 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
299 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
300 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
301 assertEquals("Entries size", 1, appendEntries.getEntries().size());
302 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
303 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
304 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
305 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
309 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
310 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
312 MockRaftActorContext actorContext = createActorContextWithFollower();
313 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
315 public FiniteDuration getHeartBeatInterval() {
316 return FiniteDuration.apply(5, TimeUnit.SECONDS);
321 actorContext.getTermInformation().update(term, "");
323 leader = new Leader(actorContext);
325 // Leader will send an immediate heartbeat - ignore it.
326 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
328 // The follower would normally reply - simulate that explicitly here.
329 long lastIndex = actorContext.getReplicatedLog().lastIndex();
330 leader.handleMessage(followerActor, new AppendEntriesReply(
331 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
334 followerActor.underlyingActor().clear();
336 for (int i = 0; i < 5; i++) {
337 sendReplicate(actorContext, lastIndex + i + 1);
340 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341 // We expect only 1 message to be sent because of two reasons,
342 // - an append entries reply was not received
343 // - the heartbeat interval has not expired
344 // In this scenario if multiple messages are sent they would likely be duplicates
345 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
349 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
350 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
352 MockRaftActorContext actorContext = createActorContextWithFollower();
353 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355 public FiniteDuration getHeartBeatInterval() {
356 return FiniteDuration.apply(5, TimeUnit.SECONDS);
361 actorContext.getTermInformation().update(term, "");
363 leader = new Leader(actorContext);
365 // Leader will send an immediate heartbeat - ignore it.
366 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368 // The follower would normally reply - simulate that explicitly here.
369 long lastIndex = actorContext.getReplicatedLog().lastIndex();
370 leader.handleMessage(followerActor, new AppendEntriesReply(
371 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374 followerActor.underlyingActor().clear();
376 for (int i = 0; i < 3; i++) {
377 sendReplicate(actorContext, lastIndex + i + 1);
378 leader.handleMessage(followerActor, new AppendEntriesReply(
379 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
382 // We are expecting six messages here -- a request to replicate and a consensus-reached message
383 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
384 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
385 for (int i = 0; i < 3; i++) {
386 assertRequestEntry(lastIndex, allMessages, i);
387 assertCommitEntry(lastIndex, allMessages, i);
390 // Now perform another commit, eliciting a request to persist
391 sendReplicate(actorContext, lastIndex + 3 + 1);
392 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393 // This elicits another message for request to replicate
394 assertEquals("The number of request entries collected", 7, allMessages.size());
395 assertRequestEntry(lastIndex, allMessages, 3);
397 sendReplicate(actorContext, lastIndex + 4 + 1);
398 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399 assertEquals("The number of request entries collected", 7, allMessages.size());
402 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
403 final int messageNr) {
404 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
405 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
406 assertEquals(ImmutableList.of(), commitReq.getEntries());
409 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
410 final int messageNr) {
411 final AppendEntries req = allMessages.get(2 * messageNr);
412 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
414 final List<ReplicatedLogEntry> entries = req.getEntries();
415 assertEquals(1, entries.size());
416 assertEquals(messageNr + 2, entries.get(0).getIndex());
420 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
421 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
423 MockRaftActorContext actorContext = createActorContextWithFollower();
424 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
426 public FiniteDuration getHeartBeatInterval() {
427 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
432 actorContext.getTermInformation().update(term, "");
434 leader = new Leader(actorContext);
436 // Leader will send an immediate heartbeat - ignore it.
437 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
439 // The follower would normally reply - simulate that explicitly here.
440 long lastIndex = actorContext.getReplicatedLog().lastIndex();
441 leader.handleMessage(followerActor, new AppendEntriesReply(
442 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
443 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
445 followerActor.underlyingActor().clear();
447 sendReplicate(actorContext, lastIndex + 1);
449 // Wait slightly longer than heartbeat duration
450 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
452 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
454 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
457 assertEquals(1, allMessages.get(0).getEntries().size());
458 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
459 assertEquals(1, allMessages.get(1).getEntries().size());
460 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
465 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
466 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
468 MockRaftActorContext actorContext = createActorContextWithFollower();
469 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
471 public FiniteDuration getHeartBeatInterval() {
472 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
477 actorContext.getTermInformation().update(term, "");
479 leader = new Leader(actorContext);
481 // Leader will send an immediate heartbeat - ignore it.
482 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484 // The follower would normally reply - simulate that explicitly here.
485 long lastIndex = actorContext.getReplicatedLog().lastIndex();
486 leader.handleMessage(followerActor, new AppendEntriesReply(
487 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
490 followerActor.underlyingActor().clear();
492 for (int i = 0; i < 3; i++) {
493 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
494 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
497 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
498 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
502 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
503 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
505 MockRaftActorContext actorContext = createActorContextWithFollower();
506 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
508 public FiniteDuration getHeartBeatInterval() {
509 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
514 actorContext.getTermInformation().update(term, "");
516 leader = new Leader(actorContext);
518 // Leader will send an immediate heartbeat - ignore it.
519 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
521 // The follower would normally reply - simulate that explicitly here.
522 long lastIndex = actorContext.getReplicatedLog().lastIndex();
523 leader.handleMessage(followerActor, new AppendEntriesReply(
524 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
525 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
527 followerActor.underlyingActor().clear();
529 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
530 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
531 sendReplicate(actorContext, lastIndex + 1);
533 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
534 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
536 assertEquals(0, allMessages.get(0).getEntries().size());
537 assertEquals(1, allMessages.get(1).getEntries().size());
542 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
543 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
545 MockRaftActorContext actorContext = createActorContext();
547 leader = new Leader(actorContext);
549 actorContext.setLastApplied(0);
551 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
552 long term = actorContext.getTermInformation().getCurrentTerm();
553 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
554 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
556 actorContext.getReplicatedLog().append(newEntry);
558 final Identifier id = new MockIdentifier("state-id");
559 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
560 new Replicate(leaderActor, id, newEntry, true));
562 // State should not change
563 assertTrue(raftBehavior instanceof Leader);
565 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
567 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
568 // one since lastApplied state is 0.
569 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
570 leaderActor, ApplyState.class);
571 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
573 for (int i = 0; i <= newLogIndex - 1; i++) {
574 ApplyState applyState = applyStateList.get(i);
575 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
576 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
579 ApplyState last = applyStateList.get((int) newLogIndex - 1);
580 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
581 assertEquals("getIdentifier", id, last.getIdentifier());
585 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
586 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
588 final MockRaftActorContext actorContext = createActorContextWithFollower();
590 Map<String, String> leadersSnapshot = new HashMap<>();
591 leadersSnapshot.put("1", "A");
592 leadersSnapshot.put("2", "B");
593 leadersSnapshot.put("3", "C");
596 actorContext.getReplicatedLog().removeFrom(0);
598 final int commitIndex = 3;
599 final int snapshotIndex = 2;
600 final int snapshotTerm = 1;
602 // set the snapshot variables in replicatedlog
603 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
604 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
605 actorContext.setCommitIndex(commitIndex);
606 //set follower timeout to 2 mins, helps during debugging
607 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
609 leader = new Leader(actorContext);
611 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
612 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
614 //update follower timestamp
615 leader.markFollowerActive(FOLLOWER_ID);
617 ByteString bs = toByteString(leadersSnapshot);
618 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
619 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
620 -1, null, null), ByteSource.wrap(bs.toByteArray())));
621 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
622 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
623 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
624 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
626 //send first chunk and no InstallSnapshotReply received yet
628 fts.incrementChunkIndex();
630 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
631 TimeUnit.MILLISECONDS);
633 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
635 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
637 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
639 //InstallSnapshotReply received
640 fts.markSendStatus(true);
642 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
644 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
646 assertEquals(commitIndex, is.getLastIncludedIndex());
650 public void testSendAppendEntriesSnapshotScenario() {
651 logStart("testSendAppendEntriesSnapshotScenario");
653 final MockRaftActorContext actorContext = createActorContextWithFollower();
655 Map<String, String> leadersSnapshot = new HashMap<>();
656 leadersSnapshot.put("1", "A");
657 leadersSnapshot.put("2", "B");
658 leadersSnapshot.put("3", "C");
661 actorContext.getReplicatedLog().removeFrom(0);
663 final int followersLastIndex = 2;
664 final int snapshotIndex = 3;
665 final int newEntryIndex = 4;
666 final int snapshotTerm = 1;
667 final int currentTerm = 2;
669 // set the snapshot variables in replicatedlog
670 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
671 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
672 actorContext.setCommitIndex(followersLastIndex);
674 leader = new Leader(actorContext);
676 // Leader will send an immediate heartbeat - ignore it.
677 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
680 SimpleReplicatedLogEntry entry =
681 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
682 new MockRaftActorContext.MockPayload("D"));
684 actorContext.getReplicatedLog().append(entry);
686 //update follower timestamp
687 leader.markFollowerActive(FOLLOWER_ID);
689 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
690 RaftActorBehavior raftBehavior = leader.handleMessage(
691 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
693 assertTrue(raftBehavior instanceof Leader);
695 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
699 public void testInitiateInstallSnapshot() {
700 logStart("testInitiateInstallSnapshot");
702 MockRaftActorContext actorContext = createActorContextWithFollower();
705 actorContext.getReplicatedLog().removeFrom(0);
707 final int followersLastIndex = 2;
708 final int snapshotIndex = 3;
709 final int newEntryIndex = 4;
710 final int snapshotTerm = 1;
711 final int currentTerm = 2;
713 // set the snapshot variables in replicatedlog
714 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
715 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
716 actorContext.setLastApplied(3);
717 actorContext.setCommitIndex(followersLastIndex);
719 leader = new Leader(actorContext);
721 // Leader will send an immediate heartbeat - ignore it.
722 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
724 // set the snapshot as absent and check if capture-snapshot is invoked.
725 leader.setSnapshotHolder(null);
728 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
729 new MockRaftActorContext.MockPayload("D"));
731 actorContext.getReplicatedLog().append(entry);
733 //update follower timestamp
734 leader.markFollowerActive(FOLLOWER_ID);
736 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
738 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
740 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
742 assertEquals(3, cs.getLastAppliedIndex());
743 assertEquals(1, cs.getLastAppliedTerm());
744 assertEquals(4, cs.getLastIndex());
745 assertEquals(2, cs.getLastTerm());
747 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
748 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
750 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
754 public void testInitiateForceInstallSnapshot() throws Exception {
755 logStart("testInitiateForceInstallSnapshot");
757 MockRaftActorContext actorContext = createActorContextWithFollower();
759 final int followersLastIndex = 2;
760 final int snapshotIndex = -1;
761 final int newEntryIndex = 4;
762 final int snapshotTerm = -1;
763 final int currentTerm = 2;
765 // set the snapshot variables in replicatedlog
766 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
767 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
768 actorContext.setLastApplied(3);
769 actorContext.setCommitIndex(followersLastIndex);
771 actorContext.getReplicatedLog().removeFrom(0);
773 AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
774 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
776 leader = new Leader(actorContext);
777 actorContext.setCurrentBehavior(leader);
779 // Leader will send an immediate heartbeat - ignore it.
780 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
782 // set the snapshot as absent and check if capture-snapshot is invoked.
783 leader.setSnapshotHolder(null);
785 for (int i = 0; i < 4; i++) {
786 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
787 new MockRaftActorContext.MockPayload("X" + i)));
791 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
792 new MockRaftActorContext.MockPayload("D"));
794 actorContext.getReplicatedLog().append(entry);
796 //update follower timestamp
797 leader.markFollowerActive(FOLLOWER_ID);
799 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
800 // installed with a SendInstallSnapshot
801 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
802 RaftVersions.CURRENT_VERSION));
804 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
806 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
807 assertEquals(3, cs.getLastAppliedIndex());
808 assertEquals(1, cs.getLastAppliedTerm());
809 assertEquals(4, cs.getLastIndex());
810 assertEquals(2, cs.getLastTerm());
812 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
813 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
815 MessageCollectorActor.clearMessages(followerActor);
817 // Sending Replicate message should not initiate another capture since the first is in progress.
818 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
819 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
821 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
822 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
823 RaftVersions.CURRENT_VERSION));
824 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
826 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
827 final byte[] bytes = new byte[]{1, 2, 3};
828 installSnapshotStream.get().get().write(bytes);
829 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
830 Runtime.getRuntime().totalMemory());
831 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
833 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
834 MessageCollectorActor.clearMessages(followerActor);
835 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
836 RaftVersions.CURRENT_VERSION));
837 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
842 public void testInstallSnapshot() {
843 logStart("testInstallSnapshot");
845 final MockRaftActorContext actorContext = createActorContextWithFollower();
847 Map<String, String> leadersSnapshot = new HashMap<>();
848 leadersSnapshot.put("1", "A");
849 leadersSnapshot.put("2", "B");
850 leadersSnapshot.put("3", "C");
853 actorContext.getReplicatedLog().removeFrom(0);
855 final int lastAppliedIndex = 3;
856 final int snapshotIndex = 2;
857 final int snapshotTerm = 1;
858 final int currentTerm = 2;
860 // set the snapshot variables in replicatedlog
861 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
862 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
863 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864 actorContext.setCommitIndex(lastAppliedIndex);
865 actorContext.setLastApplied(lastAppliedIndex);
867 leader = new Leader(actorContext);
869 // Initial heartbeat.
870 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
872 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
873 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
875 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
876 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
877 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
879 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
880 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
882 assertTrue(raftBehavior instanceof Leader);
884 // check if installsnapshot gets called with the correct values.
886 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
887 InstallSnapshot.class);
889 assertNotNull(installSnapshot.getData());
890 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
891 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
893 assertEquals(currentTerm, installSnapshot.getTerm());
897 public void testForceInstallSnapshot() {
898 logStart("testForceInstallSnapshot");
900 final MockRaftActorContext actorContext = createActorContextWithFollower();
902 Map<String, String> leadersSnapshot = new HashMap<>();
903 leadersSnapshot.put("1", "A");
904 leadersSnapshot.put("2", "B");
905 leadersSnapshot.put("3", "C");
907 final int lastAppliedIndex = 3;
908 final int snapshotIndex = -1;
909 final int snapshotTerm = -1;
910 final int currentTerm = 2;
912 // set the snapshot variables in replicatedlog
913 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916 actorContext.setCommitIndex(lastAppliedIndex);
917 actorContext.setLastApplied(lastAppliedIndex);
919 leader = new Leader(actorContext);
921 // Initial heartbeat.
922 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
924 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
925 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
927 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
928 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
929 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
931 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
932 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
934 assertTrue(raftBehavior instanceof Leader);
936 // check if installsnapshot gets called with the correct values.
938 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
939 InstallSnapshot.class);
941 assertNotNull(installSnapshot.getData());
942 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
943 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
945 assertEquals(currentTerm, installSnapshot.getTerm());
949 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
950 logStart("testHandleInstallSnapshotReplyLastChunk");
952 MockRaftActorContext actorContext = createActorContextWithFollower();
954 final int commitIndex = 3;
955 final int snapshotIndex = 2;
956 final int snapshotTerm = 1;
957 final int currentTerm = 2;
959 actorContext.setCommitIndex(commitIndex);
961 leader = new Leader(actorContext);
962 actorContext.setCurrentBehavior(leader);
964 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
965 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
967 // Ignore initial heartbeat.
968 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
970 Map<String, String> leadersSnapshot = new HashMap<>();
971 leadersSnapshot.put("1", "A");
972 leadersSnapshot.put("2", "B");
973 leadersSnapshot.put("3", "C");
975 // set the snapshot variables in replicatedlog
977 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
978 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
979 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
981 ByteString bs = toByteString(leadersSnapshot);
982 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
983 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
984 -1, null, null), ByteSource.wrap(bs.toByteArray())));
985 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
986 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
987 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
988 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
989 while (!fts.isLastChunk(fts.getChunkIndex())) {
991 fts.incrementChunkIndex();
995 actorContext.getReplicatedLog().removeFrom(0);
997 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
998 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
1000 assertTrue(raftBehavior instanceof Leader);
1002 assertEquals(1, leader.followerLogSize());
1003 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1005 assertNull(fli.getInstallSnapshotState());
1006 assertEquals(commitIndex, fli.getMatchIndex());
1007 assertEquals(commitIndex + 1, fli.getNextIndex());
1008 assertFalse(leader.hasSnapshot());
1012 public void testSendSnapshotfromInstallSnapshotReply() {
1013 logStart("testSendSnapshotfromInstallSnapshotReply");
1015 MockRaftActorContext actorContext = createActorContextWithFollower();
1017 final int commitIndex = 3;
1018 final int snapshotIndex = 2;
1019 final int snapshotTerm = 1;
1020 final int currentTerm = 2;
1022 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1024 public int getSnapshotChunkSize() {
1028 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1029 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1031 actorContext.setConfigParams(configParams);
1032 actorContext.setCommitIndex(commitIndex);
1034 leader = new Leader(actorContext);
1035 actorContext.setCurrentBehavior(leader);
1037 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1038 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1040 Map<String, String> leadersSnapshot = new HashMap<>();
1041 leadersSnapshot.put("1", "A");
1042 leadersSnapshot.put("2", "B");
1043 leadersSnapshot.put("3", "C");
1045 // set the snapshot variables in replicatedlog
1046 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1047 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1048 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1050 ByteString bs = toByteString(leadersSnapshot);
1051 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1052 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1055 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1057 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1058 InstallSnapshot.class);
1060 assertEquals(1, installSnapshot.getChunkIndex());
1061 assertEquals(3, installSnapshot.getTotalChunks());
1063 followerActor.underlyingActor().clear();
1064 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1067 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 assertEquals(2, installSnapshot.getChunkIndex());
1070 assertEquals(3, installSnapshot.getTotalChunks());
1072 followerActor.underlyingActor().clear();
1073 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1074 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1076 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1078 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1079 followerActor.underlyingActor().clear();
1080 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1083 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1085 assertNull(installSnapshot);
1090 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1091 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1093 MockRaftActorContext actorContext = createActorContextWithFollower();
1095 final int commitIndex = 3;
1096 final int snapshotIndex = 2;
1097 final int snapshotTerm = 1;
1098 final int currentTerm = 2;
1100 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1102 public int getSnapshotChunkSize() {
1107 actorContext.setCommitIndex(commitIndex);
1109 leader = new Leader(actorContext);
1111 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1114 Map<String, String> leadersSnapshot = new HashMap<>();
1115 leadersSnapshot.put("1", "A");
1116 leadersSnapshot.put("2", "B");
1117 leadersSnapshot.put("3", "C");
1119 // set the snapshot variables in replicatedlog
1120 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1124 ByteString bs = toByteString(leadersSnapshot);
1125 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1126 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1129 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1130 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1132 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1133 InstallSnapshot.class);
1135 assertEquals(1, installSnapshot.getChunkIndex());
1136 assertEquals(3, installSnapshot.getTotalChunks());
1138 followerActor.underlyingActor().clear();
1140 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1141 FOLLOWER_ID, -1, false));
1143 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144 TimeUnit.MILLISECONDS);
1146 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1148 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1150 assertEquals(1, installSnapshot.getChunkIndex());
1151 assertEquals(3, installSnapshot.getTotalChunks());
1155 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1156 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1158 MockRaftActorContext actorContext = createActorContextWithFollower();
1160 final int commitIndex = 3;
1161 final int snapshotIndex = 2;
1162 final int snapshotTerm = 1;
1163 final int currentTerm = 2;
1165 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1167 public int getSnapshotChunkSize() {
1172 actorContext.setCommitIndex(commitIndex);
1174 leader = new Leader(actorContext);
1176 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1177 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1179 Map<String, String> leadersSnapshot = new HashMap<>();
1180 leadersSnapshot.put("1", "A");
1181 leadersSnapshot.put("2", "B");
1182 leadersSnapshot.put("3", "C");
1184 // set the snapshot variables in replicatedlog
1185 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1186 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1187 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1189 ByteString bs = toByteString(leadersSnapshot);
1190 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1191 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1194 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1196 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1197 InstallSnapshot.class);
1199 assertEquals(1, installSnapshot.getChunkIndex());
1200 assertEquals(3, installSnapshot.getTotalChunks());
1201 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1202 installSnapshot.getLastChunkHashCode().getAsInt());
1204 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1206 followerActor.underlyingActor().clear();
1208 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1209 FOLLOWER_ID, 1, true));
1211 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1213 assertEquals(2, installSnapshot.getChunkIndex());
1214 assertEquals(3, installSnapshot.getTotalChunks());
1215 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1219 public void testLeaderInstallSnapshotState() throws IOException {
1220 logStart("testLeaderInstallSnapshotState");
1222 Map<String, String> leadersSnapshot = new HashMap<>();
1223 leadersSnapshot.put("1", "A");
1224 leadersSnapshot.put("2", "B");
1225 leadersSnapshot.put("3", "C");
1227 ByteString bs = toByteString(leadersSnapshot);
1228 byte[] barray = bs.toByteArray();
1230 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1231 fts.setSnapshotBytes(ByteSource.wrap(barray));
1233 assertEquals(bs.size(), barray.length);
1236 for (int i = 0; i < barray.length; i = i + 50) {
1237 int length = i + 50;
1240 if (i + 50 > barray.length) {
1241 length = barray.length;
1244 byte[] chunk = fts.getNextChunk();
1245 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1246 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1248 fts.markSendStatus(true);
1249 if (!fts.isLastChunk(chunkIndex)) {
1250 fts.incrementChunkIndex();
1254 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1259 protected Leader createBehavior(final RaftActorContext actorContext) {
1260 return new Leader(actorContext);
1264 protected MockRaftActorContext createActorContext() {
1265 return createActorContext(leaderActor);
1269 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1270 return createActorContext(LEADER_ID, actorRef);
1273 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1274 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1276 configParams.setElectionTimeoutFactor(100000);
1277 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1278 context.setConfigParams(configParams);
1279 context.setPayloadVersion(payloadVersion);
1283 private MockRaftActorContext createActorContextWithFollower() {
1284 MockRaftActorContext actorContext = createActorContext();
1285 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1286 followerActor.path().toString()).build());
1287 return actorContext;
1290 private MockRaftActorContext createFollowerActorContextWithLeader() {
1291 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1293 followerConfig.setElectionTimeoutFactor(10000);
1294 followerActorContext.setConfigParams(followerConfig);
1295 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296 return followerActorContext;
1300 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1301 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1303 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1305 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1307 Follower follower = new Follower(followerActorContext);
1308 followerActor.underlyingActor().setBehavior(follower);
1309 followerActorContext.setCurrentBehavior(follower);
1311 Map<String, String> peerAddresses = new HashMap<>();
1312 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1314 leaderActorContext.setPeerAddresses(peerAddresses);
1316 leaderActorContext.getReplicatedLog().removeFrom(0);
1319 leaderActorContext.setReplicatedLog(
1320 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322 leaderActorContext.setCommitIndex(1);
1324 followerActorContext.getReplicatedLog().removeFrom(0);
1326 // follower too has the exact same log entries and has the same commit index
1327 followerActorContext.setReplicatedLog(
1328 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1330 followerActorContext.setCommitIndex(1);
1332 leader = new Leader(leaderActorContext);
1333 leaderActorContext.setCurrentBehavior(leader);
1335 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1337 assertEquals(-1, appendEntries.getLeaderCommit());
1338 assertEquals(0, appendEntries.getEntries().size());
1339 assertEquals(0, appendEntries.getPrevLogIndex());
1341 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1342 leaderActor, AppendEntriesReply.class);
1344 assertEquals(2, appendEntriesReply.getLogLastIndex());
1345 assertEquals(1, appendEntriesReply.getLogLastTerm());
1347 // follower returns its next index
1348 assertEquals(2, appendEntriesReply.getLogLastIndex());
1349 assertEquals(1, appendEntriesReply.getLogLastTerm());
1355 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1356 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1358 final MockRaftActorContext leaderActorContext = createActorContext();
1360 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1361 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1363 Follower follower = new Follower(followerActorContext);
1364 followerActor.underlyingActor().setBehavior(follower);
1365 followerActorContext.setCurrentBehavior(follower);
1367 Map<String, String> leaderPeerAddresses = new HashMap<>();
1368 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1370 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1372 leaderActorContext.getReplicatedLog().removeFrom(0);
1374 leaderActorContext.setReplicatedLog(
1375 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1377 leaderActorContext.setCommitIndex(1);
1379 followerActorContext.getReplicatedLog().removeFrom(0);
1381 followerActorContext.setReplicatedLog(
1382 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1384 // follower has the same log entries but its commit index > leaders commit index
1385 followerActorContext.setCommitIndex(2);
1387 leader = new Leader(leaderActorContext);
1389 // Initial heartbeat
1390 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392 assertEquals(-1, appendEntries.getLeaderCommit());
1393 assertEquals(0, appendEntries.getEntries().size());
1394 assertEquals(0, appendEntries.getPrevLogIndex());
1396 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1397 leaderActor, AppendEntriesReply.class);
1399 assertEquals(2, appendEntriesReply.getLogLastIndex());
1400 assertEquals(1, appendEntriesReply.getLogLastTerm());
1402 leaderActor.underlyingActor().setBehavior(follower);
1403 leader.handleMessage(followerActor, appendEntriesReply);
1405 leaderActor.underlyingActor().clear();
1406 followerActor.underlyingActor().clear();
1408 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1409 TimeUnit.MILLISECONDS);
1411 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1413 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415 assertEquals(2, appendEntries.getLeaderCommit());
1416 assertEquals(0, appendEntries.getEntries().size());
1417 assertEquals(2, appendEntries.getPrevLogIndex());
1419 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1421 assertEquals(2, appendEntriesReply.getLogLastIndex());
1422 assertEquals(1, appendEntriesReply.getLogLastTerm());
1424 assertEquals(2, followerActorContext.getCommitIndex());
1430 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1431 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1433 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1434 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435 new FiniteDuration(1000, TimeUnit.SECONDS));
1437 leaderActorContext.setReplicatedLog(
1438 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1439 long leaderCommitIndex = 2;
1440 leaderActorContext.setCommitIndex(leaderCommitIndex);
1441 leaderActorContext.setLastApplied(leaderCommitIndex);
1443 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1444 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1446 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1448 followerActorContext.setReplicatedLog(
1449 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1450 followerActorContext.setCommitIndex(0);
1451 followerActorContext.setLastApplied(0);
1453 Follower follower = new Follower(followerActorContext);
1454 followerActor.underlyingActor().setBehavior(follower);
1456 leader = new Leader(leaderActorContext);
1458 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1459 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1460 AppendEntriesReply.class);
1462 MessageCollectorActor.clearMessages(followerActor);
1463 MessageCollectorActor.clearMessages(leaderActor);
1465 // Verify initial AppendEntries sent.
1466 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1467 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1468 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1470 leaderActor.underlyingActor().setBehavior(leader);
1472 leader.handleMessage(followerActor, appendEntriesReply);
1474 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1475 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1477 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1478 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1481 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1482 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1483 appendEntries.getEntries().get(0).getData());
1484 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1485 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1486 appendEntries.getEntries().get(1).getData());
1488 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1489 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1491 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1493 ApplyState applyState = applyStateList.get(0);
1494 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1495 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1496 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1497 applyState.getReplicatedLogEntry().getData());
1499 applyState = applyStateList.get(1);
1500 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1501 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1502 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1503 applyState.getReplicatedLogEntry().getData());
1505 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1506 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1510 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1511 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1513 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1514 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1515 new FiniteDuration(1000, TimeUnit.SECONDS));
1517 leaderActorContext.setReplicatedLog(
1518 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1519 long leaderCommitIndex = 1;
1520 leaderActorContext.setCommitIndex(leaderCommitIndex);
1521 leaderActorContext.setLastApplied(leaderCommitIndex);
1523 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1524 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1526 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1528 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1529 followerActorContext.setCommitIndex(-1);
1530 followerActorContext.setLastApplied(-1);
1532 Follower follower = new Follower(followerActorContext);
1533 followerActor.underlyingActor().setBehavior(follower);
1534 followerActorContext.setCurrentBehavior(follower);
1536 leader = new Leader(leaderActorContext);
1538 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1539 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1540 AppendEntriesReply.class);
1542 MessageCollectorActor.clearMessages(followerActor);
1543 MessageCollectorActor.clearMessages(leaderActor);
1545 // Verify initial AppendEntries sent with the leader's current commit index.
1546 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1547 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1548 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1550 leaderActor.underlyingActor().setBehavior(leader);
1551 leaderActorContext.setCurrentBehavior(leader);
1553 leader.handleMessage(followerActor, appendEntriesReply);
1555 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1556 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1558 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1559 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1560 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1562 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1563 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1564 appendEntries.getEntries().get(0).getData());
1565 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1566 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1567 appendEntries.getEntries().get(1).getData());
1569 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1570 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1572 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1574 ApplyState applyState = applyStateList.get(0);
1575 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1576 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1577 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1578 applyState.getReplicatedLogEntry().getData());
1580 applyState = applyStateList.get(1);
1581 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1582 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1583 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1584 applyState.getReplicatedLogEntry().getData());
1586 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1587 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1591 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1592 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1594 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1595 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1596 new FiniteDuration(1000, TimeUnit.SECONDS));
1598 leaderActorContext.setReplicatedLog(
1599 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1600 long leaderCommitIndex = 1;
1601 leaderActorContext.setCommitIndex(leaderCommitIndex);
1602 leaderActorContext.setLastApplied(leaderCommitIndex);
1604 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1605 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1607 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1609 followerActorContext.setReplicatedLog(
1610 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1611 followerActorContext.setCommitIndex(-1);
1612 followerActorContext.setLastApplied(-1);
1614 Follower follower = new Follower(followerActorContext);
1615 followerActor.underlyingActor().setBehavior(follower);
1616 followerActorContext.setCurrentBehavior(follower);
1618 leader = new Leader(leaderActorContext);
1620 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1621 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1622 AppendEntriesReply.class);
1624 MessageCollectorActor.clearMessages(followerActor);
1625 MessageCollectorActor.clearMessages(leaderActor);
1627 // Verify initial AppendEntries sent with the leader's current commit index.
1628 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1629 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1630 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1632 leaderActor.underlyingActor().setBehavior(leader);
1633 leaderActorContext.setCurrentBehavior(leader);
1635 leader.handleMessage(followerActor, appendEntriesReply);
1637 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1638 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1640 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1641 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1642 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1644 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1645 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1646 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1647 appendEntries.getEntries().get(0).getData());
1648 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1649 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1650 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1651 appendEntries.getEntries().get(1).getData());
1653 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1654 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1656 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1658 ApplyState applyState = applyStateList.get(0);
1659 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1660 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1661 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1662 applyState.getReplicatedLogEntry().getData());
1664 applyState = applyStateList.get(1);
1665 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1666 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1667 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1668 applyState.getReplicatedLogEntry().getData());
1670 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1671 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1672 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1676 public void testHandleAppendEntriesReplyWithNewerTerm() {
1677 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1679 MockRaftActorContext leaderActorContext = createActorContext();
1680 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1681 new FiniteDuration(10000, TimeUnit.SECONDS));
1683 leaderActorContext.setReplicatedLog(
1684 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1686 leader = new Leader(leaderActorContext);
1687 leaderActor.underlyingActor().setBehavior(leader);
1688 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1690 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1691 AppendEntriesReply.class);
1693 assertEquals(false, appendEntriesReply.isSuccess());
1694 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1696 MessageCollectorActor.clearMessages(leaderActor);
1700 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1701 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1703 MockRaftActorContext leaderActorContext = createActorContext();
1704 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1705 new FiniteDuration(10000, TimeUnit.SECONDS));
1707 leaderActorContext.setReplicatedLog(
1708 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1709 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1711 leader = new Leader(leaderActorContext);
1712 leaderActor.underlyingActor().setBehavior(leader);
1713 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1715 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1716 AppendEntriesReply.class);
1718 assertEquals(false, appendEntriesReply.isSuccess());
1719 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1721 MessageCollectorActor.clearMessages(leaderActor);
1725 public void testHandleAppendEntriesReplySuccess() {
1726 logStart("testHandleAppendEntriesReplySuccess");
1728 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1730 leaderActorContext.setReplicatedLog(
1731 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1733 leaderActorContext.setCommitIndex(1);
1734 leaderActorContext.setLastApplied(1);
1735 leaderActorContext.getTermInformation().update(1, "leader");
1737 leader = new Leader(leaderActorContext);
1739 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1741 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1742 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1744 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1746 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1748 assertEquals(RaftState.Leader, raftActorBehavior.state());
1750 assertEquals(2, leaderActorContext.getCommitIndex());
1752 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1753 leaderActor, ApplyJournalEntries.class);
1755 assertEquals(2, leaderActorContext.getLastApplied());
1757 assertEquals(2, applyJournalEntries.getToIndex());
1759 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1762 assertEquals(1,applyStateList.size());
1764 ApplyState applyState = applyStateList.get(0);
1766 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1768 assertEquals(2, followerInfo.getMatchIndex());
1769 assertEquals(3, followerInfo.getNextIndex());
1770 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1771 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1775 public void testHandleAppendEntriesReplyUnknownFollower() {
1776 logStart("testHandleAppendEntriesReplyUnknownFollower");
1778 MockRaftActorContext leaderActorContext = createActorContext();
1780 leader = new Leader(leaderActorContext);
1782 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1784 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1786 assertEquals(RaftState.Leader, raftActorBehavior.state());
1790 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1791 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1793 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1794 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1795 new FiniteDuration(1000, TimeUnit.SECONDS));
1796 // Note: the size here depends on estimate
1797 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1799 leaderActorContext.setReplicatedLog(
1800 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1801 long leaderCommitIndex = 3;
1802 leaderActorContext.setCommitIndex(leaderCommitIndex);
1803 leaderActorContext.setLastApplied(leaderCommitIndex);
1805 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1806 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1807 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1808 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1810 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1812 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1813 followerActorContext.setCommitIndex(-1);
1814 followerActorContext.setLastApplied(-1);
1816 Follower follower = new Follower(followerActorContext);
1817 followerActor.underlyingActor().setBehavior(follower);
1818 followerActorContext.setCurrentBehavior(follower);
1820 leader = new Leader(leaderActorContext);
1822 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1823 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1824 AppendEntriesReply.class);
1826 MessageCollectorActor.clearMessages(followerActor);
1827 MessageCollectorActor.clearMessages(leaderActor);
1829 // Verify initial AppendEntries sent with the leader's current commit index.
1830 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1831 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1832 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1834 leaderActor.underlyingActor().setBehavior(leader);
1835 leaderActorContext.setCurrentBehavior(leader);
1837 leader.handleMessage(followerActor, appendEntriesReply);
1839 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1840 AppendEntries.class, 2);
1841 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1843 appendEntries = appendEntriesList.get(0);
1844 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1846 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1848 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1849 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1850 appendEntries.getEntries().get(0).getData());
1851 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1852 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1853 appendEntries.getEntries().get(1).getData());
1855 appendEntries = appendEntriesList.get(1);
1856 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1857 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1858 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1860 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1861 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1862 appendEntries.getEntries().get(0).getData());
1863 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1864 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1865 appendEntries.getEntries().get(1).getData());
1867 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1868 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1870 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1872 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1873 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1877 public void testHandleRequestVoteReply() {
1878 logStart("testHandleRequestVoteReply");
1880 MockRaftActorContext leaderActorContext = createActorContext();
1882 leader = new Leader(leaderActorContext);
1884 // Should be a no-op.
1885 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1886 new RequestVoteReply(1, true));
1888 assertEquals(RaftState.Leader, raftActorBehavior.state());
1890 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1892 assertEquals(RaftState.Leader, raftActorBehavior.state());
1896 public void testIsolatedLeaderCheckNoFollowers() {
1897 logStart("testIsolatedLeaderCheckNoFollowers");
1899 MockRaftActorContext leaderActorContext = createActorContext();
1901 leader = new Leader(leaderActorContext);
1902 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1903 assertTrue(newBehavior instanceof Leader);
1907 public void testIsolatedLeaderCheckNoVotingFollowers() {
1908 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1910 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1911 Follower follower = new Follower(followerActorContext);
1912 followerActor.underlyingActor().setBehavior(follower);
1914 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1915 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1916 new FiniteDuration(1000, TimeUnit.SECONDS));
1917 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1919 leader = new Leader(leaderActorContext);
1920 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1921 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1922 assertTrue("Expected Leader", newBehavior instanceof Leader);
1925 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1926 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1927 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1929 MockRaftActorContext leaderActorContext = createActorContext();
1931 Map<String, String> peerAddresses = new HashMap<>();
1932 peerAddresses.put("follower-1", followerActor1.path().toString());
1933 peerAddresses.put("follower-2", followerActor2.path().toString());
1935 leaderActorContext.setPeerAddresses(peerAddresses);
1936 leaderActorContext.setRaftPolicy(raftPolicy);
1938 leader = new Leader(leaderActorContext);
1940 leader.markFollowerActive("follower-1");
1941 leader.markFollowerActive("follower-2");
1942 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1945 // kill 1 follower and verify if that got killed
1946 final TestKit probe = new TestKit(getSystem());
1947 probe.watch(followerActor1);
1948 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1949 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1950 assertEquals(termMsg1.getActor(), followerActor1);
1952 leader.markFollowerInActive("follower-1");
1953 leader.markFollowerActive("follower-2");
1954 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1955 assertTrue("Behavior not instance of Leader when majority of followers are active",
1956 newBehavior instanceof Leader);
1958 // kill 2nd follower and leader should change to Isolated leader
1959 followerActor2.tell(PoisonPill.getInstance(), null);
1960 probe.watch(followerActor2);
1961 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1962 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1963 assertEquals(termMsg2.getActor(), followerActor2);
1965 leader.markFollowerInActive("follower-2");
1966 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1970 public void testIsolatedLeaderCheckTwoFollowers() {
1971 logStart("testIsolatedLeaderCheckTwoFollowers");
1973 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1975 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1976 newBehavior instanceof IsolatedLeader);
1980 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1981 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1983 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1985 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1986 newBehavior instanceof Leader);
1990 public void testLaggingFollowerStarvation() {
1991 logStart("testLaggingFollowerStarvation");
1993 String leaderActorId = actorFactory.generateActorId("leader");
1994 String follower1ActorId = actorFactory.generateActorId("follower");
1995 String follower2ActorId = actorFactory.generateActorId("follower");
1997 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1998 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
2000 MockRaftActorContext leaderActorContext =
2001 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2003 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2004 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2005 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2007 leaderActorContext.setConfigParams(configParams);
2009 leaderActorContext.setReplicatedLog(
2010 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2012 Map<String, String> peerAddresses = new HashMap<>();
2013 peerAddresses.put(follower1ActorId,
2014 follower1Actor.path().toString());
2015 peerAddresses.put(follower2ActorId,
2016 follower2Actor.path().toString());
2018 leaderActorContext.setPeerAddresses(peerAddresses);
2019 leaderActorContext.getTermInformation().update(1, leaderActorId);
2021 leader = createBehavior(leaderActorContext);
2023 leaderActor.underlyingActor().setBehavior(leader);
2025 for (int i = 1; i < 6; i++) {
2026 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2027 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2028 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2029 assertTrue(newBehavior == leader);
2030 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2033 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2034 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2036 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2037 heartbeats.size() > 1);
2039 // Check if follower-2 got AppendEntries during this time and was not starved
2040 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2042 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2043 appendEntries.size() > 1);
2047 public void testReplicationConsensusWithNonVotingFollower() {
2048 logStart("testReplicationConsensusWithNonVotingFollower");
2050 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052 new FiniteDuration(1000, TimeUnit.SECONDS));
2054 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055 leaderActorContext.setCommitIndex(-1);
2056 leaderActorContext.setLastApplied(-1);
2058 String nonVotingFollowerId = "nonvoting-follower";
2059 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2060 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2062 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2063 VotingState.NON_VOTING);
2065 leader = new Leader(leaderActorContext);
2066 leaderActorContext.setCurrentBehavior(leader);
2068 // Ignore initial heartbeats
2069 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2072 MessageCollectorActor.clearMessages(followerActor);
2073 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2074 MessageCollectorActor.clearMessages(leaderActor);
2076 // Send a Replicate message and wait for AppendEntries.
2077 sendReplicate(leaderActorContext, 0);
2079 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2080 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2082 // Send reply only from the voting follower and verify consensus via ApplyState.
2083 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2085 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2087 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2089 MessageCollectorActor.clearMessages(followerActor);
2090 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2091 MessageCollectorActor.clearMessages(leaderActor);
2093 // Send another Replicate message
2094 sendReplicate(leaderActorContext, 1);
2096 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2098 AppendEntries.class);
2099 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2100 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2102 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2103 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2105 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2107 // Send reply from the voting follower and verify consensus.
2108 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2110 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2114 public void testTransferLeadershipWithFollowerInSync() {
2115 logStart("testTransferLeadershipWithFollowerInSync");
2117 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2118 leaderActorContext.setLastApplied(-1);
2119 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120 new FiniteDuration(1000, TimeUnit.SECONDS));
2121 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2123 leader = new Leader(leaderActorContext);
2124 leaderActorContext.setCurrentBehavior(leader);
2126 // Initial heartbeat
2127 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129 MessageCollectorActor.clearMessages(followerActor);
2131 sendReplicate(leaderActorContext, 0);
2132 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2135 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2136 MessageCollectorActor.clearMessages(followerActor);
2138 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139 leader.transferLeadership(mockTransferCohort);
2141 verify(mockTransferCohort, never()).transferComplete();
2142 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2146 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2149 // Leader should force an election timeout
2150 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152 verify(mockTransferCohort).transferComplete();
2156 public void testTransferLeadershipWithEmptyLog() {
2157 logStart("testTransferLeadershipWithEmptyLog");
2159 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161 new FiniteDuration(1000, TimeUnit.SECONDS));
2162 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2164 leader = new Leader(leaderActorContext);
2165 leaderActorContext.setCurrentBehavior(leader);
2167 // Initial heartbeat
2168 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170 MessageCollectorActor.clearMessages(followerActor);
2172 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2173 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2174 leader.transferLeadership(mockTransferCohort);
2176 verify(mockTransferCohort, never()).transferComplete();
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2180 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2181 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2183 // Leader should force an election timeout
2184 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2186 verify(mockTransferCohort).transferComplete();
2190 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2191 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2193 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2194 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2195 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2197 leader = new Leader(leaderActorContext);
2198 leaderActorContext.setCurrentBehavior(leader);
2200 // Initial heartbeat
2201 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2202 MessageCollectorActor.clearMessages(followerActor);
2204 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2205 doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2206 leader.transferLeadership(mockTransferCohort);
2208 verify(mockTransferCohort, never()).transferComplete();
2210 // Sync up the follower.
2211 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2212 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2213 MessageCollectorActor.clearMessages(followerActor);
2215 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2216 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2217 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2218 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2219 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2221 // Leader should force an election timeout
2222 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2224 verify(mockTransferCohort).transferComplete();
2228 public void testTransferLeadershipWithFollowerSyncTimeout() {
2229 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2231 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2232 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2233 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2234 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2235 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2237 leader = new Leader(leaderActorContext);
2238 leaderActorContext.setCurrentBehavior(leader);
2240 // Initial heartbeat
2241 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2242 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2243 MessageCollectorActor.clearMessages(followerActor);
2245 sendReplicate(leaderActorContext, 0);
2246 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2248 MessageCollectorActor.clearMessages(followerActor);
2250 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2251 leader.transferLeadership(mockTransferCohort);
2253 verify(mockTransferCohort, never()).transferComplete();
2255 // Send heartbeats to time out the transfer.
2256 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2257 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2258 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2259 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2262 verify(mockTransferCohort).abortTransfer();
2263 verify(mockTransferCohort, never()).transferComplete();
2264 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2268 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2269 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2271 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2272 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2273 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2274 final MockRaftActorContext.MockPayload largePayload =
2275 new MockRaftActorContext.MockPayload("large", serializedSize);
2277 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2278 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2279 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2280 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2281 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2282 leaderActorContext.setCommitIndex(-1);
2283 leaderActorContext.setLastApplied(-1);
2285 leader = new Leader(leaderActorContext);
2286 leaderActorContext.setCurrentBehavior(leader);
2288 // Send initial heartbeat reply so follower is marked active
2289 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2290 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2291 MessageCollectorActor.clearMessages(followerActor);
2293 // Send normal payload first to prime commit index.
2294 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2295 sendReplicate(leaderActorContext, term, 0);
2297 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2298 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2299 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2301 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2302 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2303 MessageCollectorActor.clearMessages(followerActor);
2305 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2306 sendReplicate(leaderActorContext, term, 1, largePayload);
2308 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2309 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2310 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2312 final Identifier slicingId = messageSlice.getIdentifier();
2314 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2315 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2316 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2317 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2318 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2319 MessageCollectorActor.clearMessages(followerActor);
2321 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2323 // Sleep for the heartbeat interval so AppendEntries is sent.
2324 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2325 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2327 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2329 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2330 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2331 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2332 MessageCollectorActor.clearMessages(followerActor);
2334 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2336 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2337 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2338 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2340 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2342 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2344 MessageCollectorActor.clearMessages(followerActor);
2346 // Send another normal payload.
2348 sendReplicate(leaderActorContext, term, 2);
2350 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2352 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2353 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2357 public void testLargePayloadSlicingExpiration() {
2358 logStart("testLargePayloadSlicingExpiration");
2360 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2361 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2362 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2363 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2364 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2365 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2366 leaderActorContext.setCommitIndex(-1);
2367 leaderActorContext.setLastApplied(-1);
2369 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2370 leader = new Leader(leaderActorContext);
2371 leaderActorContext.setCurrentBehavior(leader);
2373 // Send initial heartbeat reply so follower is marked active
2374 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2375 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2376 MessageCollectorActor.clearMessages(followerActor);
2378 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2379 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2380 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2382 // Sleep for at least 3 * election timeout so the slicing state expires.
2383 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2384 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2385 MessageCollectorActor.clearMessages(followerActor);
2387 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2389 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2390 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2391 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2393 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2394 MessageCollectorActor.clearMessages(followerActor);
2396 // Send an AppendEntriesReply - this should restart the slicing.
2398 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2399 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2401 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2403 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2407 public void testLeaderAddressInAppendEntries() {
2408 logStart("testLeaderAddressInAppendEntries");
2410 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2411 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2412 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2413 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2414 leaderActorContext.setCommitIndex(-1);
2415 leaderActorContext.setLastApplied(-1);
2417 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2418 peerId -> leaderActor.path().toString());
2420 leader = new Leader(leaderActorContext);
2421 leaderActorContext.setCurrentBehavior(leader);
2423 // Initial heartbeat shouldn't have the leader address
2425 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2426 assertFalse(appendEntries.getLeaderAddress().isPresent());
2427 MessageCollectorActor.clearMessages(followerActor);
2429 // Send AppendEntriesReply indicating the follower needs the leader address
2431 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2432 RaftVersions.CURRENT_VERSION));
2434 // Sleep for the heartbeat interval so AppendEntries is sent.
2435 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2436 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2438 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2440 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2441 assertTrue(appendEntries.getLeaderAddress().isPresent());
2442 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2443 MessageCollectorActor.clearMessages(followerActor);
2445 // Send AppendEntriesReply indicating the follower does not need the leader address
2447 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2448 RaftVersions.CURRENT_VERSION));
2450 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2451 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2453 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2455 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2456 assertFalse(appendEntries.getLeaderAddress().isPresent());
2460 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2461 final ActorRef actorRef, final RaftRPC rpc) {
2462 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2463 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2466 private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2468 private final long electionTimeOutIntervalMillis;
2469 private final int snapshotChunkSize;
2471 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2472 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2473 this.snapshotChunkSize = snapshotChunkSize;
2477 public FiniteDuration getElectionTimeOutInterval() {
2478 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2482 public int getSnapshotChunkSize() {
2483 return snapshotChunkSize;