2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.protobuf.ByteString;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.io.ByteSource;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import java.io.IOException;
35 import java.io.OutputStream;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.apache.commons.lang3.SerializationUtils;
44 import org.junit.After;
45 import org.junit.Test;
46 import org.opendaylight.controller.cluster.messaging.MessageSlice;
47 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
48 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
49 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
50 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorContext;
52 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
53 import org.opendaylight.controller.cluster.raft.RaftState;
54 import org.opendaylight.controller.cluster.raft.RaftVersions;
55 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.VotingState;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
58 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
60 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
62 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
66 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
68 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
69 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
70 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
71 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
76 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
77 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
78 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import org.opendaylight.yangtools.concepts.Identifier;
81 import scala.concurrent.duration.FiniteDuration;
83 public class LeaderTest extends AbstractLeaderTest<Leader> {
85 static final String FOLLOWER_ID = "follower";
86 public static final String LEADER_ID = "leader";
88 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
89 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
91 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
92 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
94 private Leader leader;
95 private final short payloadVersion = 5;
99 public void tearDown() {
100 if (leader != null) {
108 public void testHandleMessageForUnknownMessage() {
109 logStart("testHandleMessageForUnknownMessage");
111 leader = new Leader(createActorContext());
113 // handle message should null when it receives an unknown message
114 assertNull(leader.handleMessage(followerActor, "foo"));
118 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
119 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
121 MockRaftActorContext actorContext = createActorContextWithFollower();
122 actorContext.setCommitIndex(-1);
123 actorContext.setPayloadVersion(payloadVersion);
126 actorContext.getTermInformation().update(term, "");
128 leader = new Leader(actorContext);
129 actorContext.setCurrentBehavior(leader);
131 // Leader should send an immediate heartbeat with no entries as follower is inactive.
132 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
133 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
134 assertEquals("getTerm", term, appendEntries.getTerm());
135 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
136 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
137 assertEquals("Entries size", 0, appendEntries.getEntries().size());
138 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
140 // The follower would normally reply - simulate that explicitly here.
141 leader.handleMessage(followerActor, new AppendEntriesReply(
142 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
143 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
145 followerActor.underlyingActor().clear();
147 // Sleep for the heartbeat interval so AppendEntries is sent.
148 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
149 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
151 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
153 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
154 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
155 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
156 assertEquals("Entries size", 1, appendEntries.getEntries().size());
157 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
158 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
159 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
163 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
164 return sendReplicate(actorContext, 1, index);
167 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
169 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
172 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
173 final Payload payload) {
174 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
175 actorContext.getReplicatedLog().append(newEntry);
176 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
180 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
181 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
183 MockRaftActorContext actorContext = createActorContextWithFollower();
186 actorContext.getTermInformation().update(term, "");
188 leader = new Leader(actorContext);
190 // Leader will send an immediate heartbeat - ignore it.
191 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
193 // The follower would normally reply - simulate that explicitly here.
194 long lastIndex = actorContext.getReplicatedLog().lastIndex();
195 leader.handleMessage(followerActor, new AppendEntriesReply(
196 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
197 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
199 followerActor.underlyingActor().clear();
201 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
203 // State should not change
204 assertTrue(raftBehavior instanceof Leader);
206 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
207 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
208 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
209 assertEquals("Entries size", 1, appendEntries.getEntries().size());
210 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
211 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
212 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
213 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
217 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
218 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
220 MockRaftActorContext actorContext = createActorContextWithFollower();
221 actorContext.setCommitIndex(-1);
222 actorContext.setLastApplied(-1);
224 // The raft context is initialized with a couple log entries. However the commitIndex
225 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
226 // committed and applied. Now it regains leadership with a higher term (2).
227 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
228 long newTerm = prevTerm + 1;
229 actorContext.getTermInformation().update(newTerm, "");
231 leader = new Leader(actorContext);
232 actorContext.setCurrentBehavior(leader);
234 // Leader will send an immediate heartbeat - ignore it.
235 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237 // The follower replies with the leader's current last index and term, simulating that it is
238 // up to date with the leader.
239 long lastIndex = actorContext.getReplicatedLog().lastIndex();
240 leader.handleMessage(followerActor, new AppendEntriesReply(
241 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
243 // The commit index should not get updated even though consensus was reached. This is b/c the
244 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
245 // from previous terms by counting replicas".
246 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
248 followerActor.underlyingActor().clear();
250 // Now replicate a new entry with the new term 2.
251 long newIndex = lastIndex + 1;
252 sendReplicate(actorContext, newTerm, newIndex);
254 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
255 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
256 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
257 assertEquals("Entries size", 1, appendEntries.getEntries().size());
258 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
259 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
260 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
262 // The follower replies with success. The leader should now update the commit index to the new index
263 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
264 // prior entries are committed indirectly".
265 leader.handleMessage(followerActor, new AppendEntriesReply(
266 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
268 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
272 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
273 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
275 MockRaftActorContext actorContext = createActorContextWithFollower();
276 actorContext.setRaftPolicy(createRaftPolicy(true, true));
279 actorContext.getTermInformation().update(term, "");
281 leader = new Leader(actorContext);
283 // Leader will send an immediate heartbeat - ignore it.
284 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
286 // The follower would normally reply - simulate that explicitly here.
287 long lastIndex = actorContext.getReplicatedLog().lastIndex();
288 leader.handleMessage(followerActor, new AppendEntriesReply(
289 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
290 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
292 followerActor.underlyingActor().clear();
294 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
296 // State should not change
297 assertTrue(raftBehavior instanceof Leader);
299 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
300 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
301 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
302 assertEquals("Entries size", 1, appendEntries.getEntries().size());
303 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
304 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
305 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
306 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
310 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
311 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
313 MockRaftActorContext actorContext = createActorContextWithFollower();
314 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
316 public FiniteDuration getHeartBeatInterval() {
317 return FiniteDuration.apply(5, TimeUnit.SECONDS);
322 actorContext.getTermInformation().update(term, "");
324 leader = new Leader(actorContext);
326 // Leader will send an immediate heartbeat - ignore it.
327 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
329 // The follower would normally reply - simulate that explicitly here.
330 long lastIndex = actorContext.getReplicatedLog().lastIndex();
331 leader.handleMessage(followerActor, new AppendEntriesReply(
332 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
333 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
335 followerActor.underlyingActor().clear();
337 for (int i = 0; i < 5; i++) {
338 sendReplicate(actorContext, lastIndex + i + 1);
341 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
342 // We expect only 1 message to be sent because of two reasons,
343 // - an append entries reply was not received
344 // - the heartbeat interval has not expired
345 // In this scenario if multiple messages are sent they would likely be duplicates
346 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
350 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
351 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
353 MockRaftActorContext actorContext = createActorContextWithFollower();
354 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
356 public FiniteDuration getHeartBeatInterval() {
357 return FiniteDuration.apply(5, TimeUnit.SECONDS);
362 actorContext.getTermInformation().update(term, "");
364 leader = new Leader(actorContext);
366 // Leader will send an immediate heartbeat - ignore it.
367 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
369 // The follower would normally reply - simulate that explicitly here.
370 long lastIndex = actorContext.getReplicatedLog().lastIndex();
371 leader.handleMessage(followerActor, new AppendEntriesReply(
372 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
375 followerActor.underlyingActor().clear();
377 for (int i = 0; i < 3; i++) {
378 sendReplicate(actorContext, lastIndex + i + 1);
379 leader.handleMessage(followerActor, new AppendEntriesReply(
380 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
383 // We are expecting six messages here -- a request to replicate and a consensus-reached message
384 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
385 assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
386 for (int i = 0; i < 3; i++) {
387 assertRequestEntry(lastIndex, allMessages, i);
388 assertCommitEntry(lastIndex, allMessages, i);
391 // Now perform another commit, eliciting a request to persist
392 sendReplicate(actorContext, lastIndex + 3 + 1);
393 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
394 // This elicits another message for request to replicate
395 assertEquals("The number of request entries collected", 7, allMessages.size());
396 assertRequestEntry(lastIndex, allMessages, 3);
398 sendReplicate(actorContext, lastIndex + 4 + 1);
399 allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
400 assertEquals("The number of request entries collected", 7, allMessages.size());
403 private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
404 final int messageNr) {
405 final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
406 assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
407 assertEquals(ImmutableList.of(), commitReq.getEntries());
410 private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
411 final int messageNr) {
412 final AppendEntries req = allMessages.get(2 * messageNr);
413 assertEquals(lastIndex + messageNr, req.getLeaderCommit());
415 final List<ReplicatedLogEntry> entries = req.getEntries();
416 assertEquals(1, entries.size());
417 assertEquals(messageNr + 2, entries.get(0).getIndex());
421 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
422 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
424 MockRaftActorContext actorContext = createActorContextWithFollower();
425 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
427 public FiniteDuration getHeartBeatInterval() {
428 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
433 actorContext.getTermInformation().update(term, "");
435 leader = new Leader(actorContext);
437 // Leader will send an immediate heartbeat - ignore it.
438 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
440 // The follower would normally reply - simulate that explicitly here.
441 long lastIndex = actorContext.getReplicatedLog().lastIndex();
442 leader.handleMessage(followerActor, new AppendEntriesReply(
443 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
444 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
446 followerActor.underlyingActor().clear();
448 sendReplicate(actorContext, lastIndex + 1);
450 // Wait slightly longer than heartbeat duration
451 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
453 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
455 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
456 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
458 assertEquals(1, allMessages.get(0).getEntries().size());
459 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
460 assertEquals(1, allMessages.get(1).getEntries().size());
461 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
466 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
467 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
469 MockRaftActorContext actorContext = createActorContextWithFollower();
470 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
472 public FiniteDuration getHeartBeatInterval() {
473 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
478 actorContext.getTermInformation().update(term, "");
480 leader = new Leader(actorContext);
482 // Leader will send an immediate heartbeat - ignore it.
483 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
485 // The follower would normally reply - simulate that explicitly here.
486 long lastIndex = actorContext.getReplicatedLog().lastIndex();
487 leader.handleMessage(followerActor, new AppendEntriesReply(
488 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
489 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
491 followerActor.underlyingActor().clear();
493 for (int i = 0; i < 3; i++) {
494 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
495 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
498 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
499 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
503 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
504 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
506 MockRaftActorContext actorContext = createActorContextWithFollower();
507 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
509 public FiniteDuration getHeartBeatInterval() {
510 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
515 actorContext.getTermInformation().update(term, "");
517 leader = new Leader(actorContext);
519 // Leader will send an immediate heartbeat - ignore it.
520 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
522 // The follower would normally reply - simulate that explicitly here.
523 long lastIndex = actorContext.getReplicatedLog().lastIndex();
524 leader.handleMessage(followerActor, new AppendEntriesReply(
525 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
526 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
528 followerActor.underlyingActor().clear();
530 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
531 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
532 sendReplicate(actorContext, lastIndex + 1);
534 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
535 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
537 assertEquals(0, allMessages.get(0).getEntries().size());
538 assertEquals(1, allMessages.get(1).getEntries().size());
543 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
544 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
546 MockRaftActorContext actorContext = createActorContext();
548 leader = new Leader(actorContext);
550 actorContext.setLastApplied(0);
552 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
553 long term = actorContext.getTermInformation().getCurrentTerm();
554 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
555 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
557 actorContext.getReplicatedLog().append(newEntry);
559 final Identifier id = new MockIdentifier("state-id");
560 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
561 new Replicate(leaderActor, id, newEntry, true));
563 // State should not change
564 assertTrue(raftBehavior instanceof Leader);
566 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
568 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
569 // one since lastApplied state is 0.
570 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
571 leaderActor, ApplyState.class);
572 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
574 for (int i = 0; i <= newLogIndex - 1; i++) {
575 ApplyState applyState = applyStateList.get(i);
576 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
577 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
580 ApplyState last = applyStateList.get((int) newLogIndex - 1);
581 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
582 assertEquals("getIdentifier", id, last.getIdentifier());
586 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
587 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
589 final MockRaftActorContext actorContext = createActorContextWithFollower();
591 Map<String, String> leadersSnapshot = new HashMap<>();
592 leadersSnapshot.put("1", "A");
593 leadersSnapshot.put("2", "B");
594 leadersSnapshot.put("3", "C");
597 actorContext.getReplicatedLog().removeFrom(0);
599 final int commitIndex = 3;
600 final int snapshotIndex = 2;
601 final int snapshotTerm = 1;
603 // set the snapshot variables in replicatedlog
604 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
605 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
606 actorContext.setCommitIndex(commitIndex);
607 //set follower timeout to 2 mins, helps during debugging
608 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
610 leader = new Leader(actorContext);
612 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
613 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
615 //update follower timestamp
616 leader.markFollowerActive(FOLLOWER_ID);
618 ByteString bs = toByteString(leadersSnapshot);
619 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
620 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
621 -1, null, null), ByteSource.wrap(bs.toByteArray())));
622 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
623 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
624 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
625 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
627 //send first chunk and no InstallSnapshotReply received yet
629 fts.incrementChunkIndex();
631 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
632 TimeUnit.MILLISECONDS);
634 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
636 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
638 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
640 //InstallSnapshotReply received
641 fts.markSendStatus(true);
643 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
645 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
647 assertEquals(commitIndex, is.getLastIncludedIndex());
651 public void testSendAppendEntriesSnapshotScenario() {
652 logStart("testSendAppendEntriesSnapshotScenario");
654 final MockRaftActorContext actorContext = createActorContextWithFollower();
656 Map<String, String> leadersSnapshot = new HashMap<>();
657 leadersSnapshot.put("1", "A");
658 leadersSnapshot.put("2", "B");
659 leadersSnapshot.put("3", "C");
662 actorContext.getReplicatedLog().removeFrom(0);
664 final int followersLastIndex = 2;
665 final int snapshotIndex = 3;
666 final int newEntryIndex = 4;
667 final int snapshotTerm = 1;
668 final int currentTerm = 2;
670 // set the snapshot variables in replicatedlog
671 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
672 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
673 actorContext.setCommitIndex(followersLastIndex);
675 leader = new Leader(actorContext);
677 // Leader will send an immediate heartbeat - ignore it.
678 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
681 SimpleReplicatedLogEntry entry =
682 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
683 new MockRaftActorContext.MockPayload("D"));
685 actorContext.getReplicatedLog().append(entry);
687 //update follower timestamp
688 leader.markFollowerActive(FOLLOWER_ID);
690 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
691 RaftActorBehavior raftBehavior = leader.handleMessage(
692 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
694 assertTrue(raftBehavior instanceof Leader);
696 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
700 public void testInitiateInstallSnapshot() {
701 logStart("testInitiateInstallSnapshot");
703 MockRaftActorContext actorContext = createActorContextWithFollower();
706 actorContext.getReplicatedLog().removeFrom(0);
708 final int followersLastIndex = 2;
709 final int snapshotIndex = 3;
710 final int newEntryIndex = 4;
711 final int snapshotTerm = 1;
712 final int currentTerm = 2;
714 // set the snapshot variables in replicatedlog
715 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
716 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
717 actorContext.setLastApplied(3);
718 actorContext.setCommitIndex(followersLastIndex);
720 leader = new Leader(actorContext);
722 // Leader will send an immediate heartbeat - ignore it.
723 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
725 // set the snapshot as absent and check if capture-snapshot is invoked.
726 leader.setSnapshotHolder(null);
729 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
730 new MockRaftActorContext.MockPayload("D"));
732 actorContext.getReplicatedLog().append(entry);
734 //update follower timestamp
735 leader.markFollowerActive(FOLLOWER_ID);
737 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
739 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
741 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
743 assertEquals(3, cs.getLastAppliedIndex());
744 assertEquals(1, cs.getLastAppliedTerm());
745 assertEquals(4, cs.getLastIndex());
746 assertEquals(2, cs.getLastTerm());
748 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
749 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
751 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
755 public void testInitiateForceInstallSnapshot() throws Exception {
756 logStart("testInitiateForceInstallSnapshot");
758 MockRaftActorContext actorContext = createActorContextWithFollower();
760 final int followersLastIndex = 2;
761 final int snapshotIndex = -1;
762 final int newEntryIndex = 4;
763 final int snapshotTerm = -1;
764 final int currentTerm = 2;
766 // set the snapshot variables in replicatedlog
767 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
768 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
769 actorContext.setLastApplied(3);
770 actorContext.setCommitIndex(followersLastIndex);
772 actorContext.getReplicatedLog().removeFrom(0);
774 AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
775 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
777 leader = new Leader(actorContext);
778 actorContext.setCurrentBehavior(leader);
780 // Leader will send an immediate heartbeat - ignore it.
781 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
783 // set the snapshot as absent and check if capture-snapshot is invoked.
784 leader.setSnapshotHolder(null);
786 for (int i = 0; i < 4; i++) {
787 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
788 new MockRaftActorContext.MockPayload("X" + i)));
792 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
793 new MockRaftActorContext.MockPayload("D"));
795 actorContext.getReplicatedLog().append(entry);
797 //update follower timestamp
798 leader.markFollowerActive(FOLLOWER_ID);
800 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
801 // installed with a SendInstallSnapshot
802 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
803 RaftVersions.CURRENT_VERSION));
805 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
807 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
808 assertEquals(3, cs.getLastAppliedIndex());
809 assertEquals(1, cs.getLastAppliedTerm());
810 assertEquals(4, cs.getLastIndex());
811 assertEquals(2, cs.getLastTerm());
813 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
814 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
816 MessageCollectorActor.clearMessages(followerActor);
818 // Sending Replicate message should not initiate another capture since the first is in progress.
819 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
820 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
822 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
823 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
824 RaftVersions.CURRENT_VERSION));
825 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
827 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
828 final byte[] bytes = new byte[]{1, 2, 3};
829 installSnapshotStream.get().get().write(bytes);
830 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
831 Runtime.getRuntime().totalMemory());
832 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
834 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
835 MessageCollectorActor.clearMessages(followerActor);
836 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
837 RaftVersions.CURRENT_VERSION));
838 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
843 public void testInstallSnapshot() {
844 logStart("testInstallSnapshot");
846 final MockRaftActorContext actorContext = createActorContextWithFollower();
848 Map<String, String> leadersSnapshot = new HashMap<>();
849 leadersSnapshot.put("1", "A");
850 leadersSnapshot.put("2", "B");
851 leadersSnapshot.put("3", "C");
854 actorContext.getReplicatedLog().removeFrom(0);
856 final int lastAppliedIndex = 3;
857 final int snapshotIndex = 2;
858 final int snapshotTerm = 1;
859 final int currentTerm = 2;
861 // set the snapshot variables in replicatedlog
862 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
863 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
864 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
865 actorContext.setCommitIndex(lastAppliedIndex);
866 actorContext.setLastApplied(lastAppliedIndex);
868 leader = new Leader(actorContext);
870 // Initial heartbeat.
871 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
873 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
874 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
876 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
877 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
878 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
880 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
881 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
883 assertTrue(raftBehavior instanceof Leader);
885 // check if installsnapshot gets called with the correct values.
887 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
888 InstallSnapshot.class);
890 assertNotNull(installSnapshot.getData());
891 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
892 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
894 assertEquals(currentTerm, installSnapshot.getTerm());
898 public void testForceInstallSnapshot() {
899 logStart("testForceInstallSnapshot");
901 final MockRaftActorContext actorContext = createActorContextWithFollower();
903 Map<String, String> leadersSnapshot = new HashMap<>();
904 leadersSnapshot.put("1", "A");
905 leadersSnapshot.put("2", "B");
906 leadersSnapshot.put("3", "C");
908 final int lastAppliedIndex = 3;
909 final int snapshotIndex = -1;
910 final int snapshotTerm = -1;
911 final int currentTerm = 2;
913 // set the snapshot variables in replicatedlog
914 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917 actorContext.setCommitIndex(lastAppliedIndex);
918 actorContext.setLastApplied(lastAppliedIndex);
920 leader = new Leader(actorContext);
922 // Initial heartbeat.
923 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
925 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
926 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
928 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
929 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
930 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
932 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
933 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
935 assertTrue(raftBehavior instanceof Leader);
937 // check if installsnapshot gets called with the correct values.
939 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
940 InstallSnapshot.class);
942 assertNotNull(installSnapshot.getData());
943 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
944 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
946 assertEquals(currentTerm, installSnapshot.getTerm());
950 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
951 logStart("testHandleInstallSnapshotReplyLastChunk");
953 MockRaftActorContext actorContext = createActorContextWithFollower();
955 final int commitIndex = 3;
956 final int snapshotIndex = 2;
957 final int snapshotTerm = 1;
958 final int currentTerm = 2;
960 actorContext.setCommitIndex(commitIndex);
962 leader = new Leader(actorContext);
963 actorContext.setCurrentBehavior(leader);
965 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
966 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
968 // Ignore initial heartbeat.
969 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
971 Map<String, String> leadersSnapshot = new HashMap<>();
972 leadersSnapshot.put("1", "A");
973 leadersSnapshot.put("2", "B");
974 leadersSnapshot.put("3", "C");
976 // set the snapshot variables in replicatedlog
978 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
979 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
980 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
982 ByteString bs = toByteString(leadersSnapshot);
983 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
984 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
985 -1, null, null), ByteSource.wrap(bs.toByteArray())));
986 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
987 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
988 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
989 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
990 while (!fts.isLastChunk(fts.getChunkIndex())) {
992 fts.incrementChunkIndex();
996 actorContext.getReplicatedLog().removeFrom(0);
998 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
999 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
1001 assertTrue(raftBehavior instanceof Leader);
1003 assertEquals(1, leader.followerLogSize());
1004 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1006 assertNull(fli.getInstallSnapshotState());
1007 assertEquals(commitIndex, fli.getMatchIndex());
1008 assertEquals(commitIndex + 1, fli.getNextIndex());
1009 assertFalse(leader.hasSnapshot());
1013 public void testSendSnapshotfromInstallSnapshotReply() {
1014 logStart("testSendSnapshotfromInstallSnapshotReply");
1016 MockRaftActorContext actorContext = createActorContextWithFollower();
1018 final int commitIndex = 3;
1019 final int snapshotIndex = 2;
1020 final int snapshotTerm = 1;
1021 final int currentTerm = 2;
1023 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1025 public int getSnapshotChunkSize() {
1029 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1030 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1032 actorContext.setConfigParams(configParams);
1033 actorContext.setCommitIndex(commitIndex);
1035 leader = new Leader(actorContext);
1036 actorContext.setCurrentBehavior(leader);
1038 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1039 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1041 Map<String, String> leadersSnapshot = new HashMap<>();
1042 leadersSnapshot.put("1", "A");
1043 leadersSnapshot.put("2", "B");
1044 leadersSnapshot.put("3", "C");
1046 // set the snapshot variables in replicatedlog
1047 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1048 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1049 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1051 ByteString bs = toByteString(leadersSnapshot);
1052 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1053 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1056 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1058 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1059 InstallSnapshot.class);
1061 assertEquals(1, installSnapshot.getChunkIndex());
1062 assertEquals(3, installSnapshot.getTotalChunks());
1064 followerActor.underlyingActor().clear();
1065 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1066 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1068 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1070 assertEquals(2, installSnapshot.getChunkIndex());
1071 assertEquals(3, installSnapshot.getTotalChunks());
1073 followerActor.underlyingActor().clear();
1074 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1077 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1079 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1080 followerActor.underlyingActor().clear();
1081 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1082 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1084 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1086 assertNull(installSnapshot);
1091 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1092 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1094 MockRaftActorContext actorContext = createActorContextWithFollower();
1096 final int commitIndex = 3;
1097 final int snapshotIndex = 2;
1098 final int snapshotTerm = 1;
1099 final int currentTerm = 2;
1101 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1103 public int getSnapshotChunkSize() {
1108 actorContext.setCommitIndex(commitIndex);
1110 leader = new Leader(actorContext);
1112 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1113 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1115 Map<String, String> leadersSnapshot = new HashMap<>();
1116 leadersSnapshot.put("1", "A");
1117 leadersSnapshot.put("2", "B");
1118 leadersSnapshot.put("3", "C");
1120 // set the snapshot variables in replicatedlog
1121 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1122 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1123 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1125 ByteString bs = toByteString(leadersSnapshot);
1126 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1127 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1130 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1131 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1133 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1134 InstallSnapshot.class);
1136 assertEquals(1, installSnapshot.getChunkIndex());
1137 assertEquals(3, installSnapshot.getTotalChunks());
1139 followerActor.underlyingActor().clear();
1141 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1142 FOLLOWER_ID, -1, false));
1144 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1145 TimeUnit.MILLISECONDS);
1147 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1149 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1151 assertEquals(1, installSnapshot.getChunkIndex());
1152 assertEquals(3, installSnapshot.getTotalChunks());
1156 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1157 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1159 MockRaftActorContext actorContext = createActorContextWithFollower();
1161 final int commitIndex = 3;
1162 final int snapshotIndex = 2;
1163 final int snapshotTerm = 1;
1164 final int currentTerm = 2;
1166 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1168 public int getSnapshotChunkSize() {
1173 actorContext.setCommitIndex(commitIndex);
1175 leader = new Leader(actorContext);
1177 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1178 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1180 Map<String, String> leadersSnapshot = new HashMap<>();
1181 leadersSnapshot.put("1", "A");
1182 leadersSnapshot.put("2", "B");
1183 leadersSnapshot.put("3", "C");
1185 // set the snapshot variables in replicatedlog
1186 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1187 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1188 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1190 ByteString bs = toByteString(leadersSnapshot);
1191 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1192 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1195 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1197 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1198 InstallSnapshot.class);
1200 assertEquals(1, installSnapshot.getChunkIndex());
1201 assertEquals(3, installSnapshot.getTotalChunks());
1202 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1203 installSnapshot.getLastChunkHashCode().getAsInt());
1205 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1207 followerActor.underlyingActor().clear();
1209 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1210 FOLLOWER_ID, 1, true));
1212 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1214 assertEquals(2, installSnapshot.getChunkIndex());
1215 assertEquals(3, installSnapshot.getTotalChunks());
1216 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1220 public void testLeaderInstallSnapshotState() throws IOException {
1221 logStart("testLeaderInstallSnapshotState");
1223 Map<String, String> leadersSnapshot = new HashMap<>();
1224 leadersSnapshot.put("1", "A");
1225 leadersSnapshot.put("2", "B");
1226 leadersSnapshot.put("3", "C");
1228 ByteString bs = toByteString(leadersSnapshot);
1229 byte[] barray = bs.toByteArray();
1231 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1232 fts.setSnapshotBytes(ByteSource.wrap(barray));
1234 assertEquals(bs.size(), barray.length);
1237 for (int i = 0; i < barray.length; i = i + 50) {
1238 int length = i + 50;
1241 if (i + 50 > barray.length) {
1242 length = barray.length;
1245 byte[] chunk = fts.getNextChunk();
1246 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1247 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1249 fts.markSendStatus(true);
1250 if (!fts.isLastChunk(chunkIndex)) {
1251 fts.incrementChunkIndex();
1255 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1260 protected Leader createBehavior(final RaftActorContext actorContext) {
1261 return new Leader(actorContext);
1265 protected MockRaftActorContext createActorContext() {
1266 return createActorContext(leaderActor);
1270 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1271 return createActorContext(LEADER_ID, actorRef);
1274 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1275 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1276 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1277 configParams.setElectionTimeoutFactor(100000);
1278 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1279 context.setConfigParams(configParams);
1280 context.setPayloadVersion(payloadVersion);
1284 private MockRaftActorContext createActorContextWithFollower() {
1285 MockRaftActorContext actorContext = createActorContext();
1286 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1287 followerActor.path().toString()).build());
1288 return actorContext;
1291 private MockRaftActorContext createFollowerActorContextWithLeader() {
1292 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1293 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1294 followerConfig.setElectionTimeoutFactor(10000);
1295 followerActorContext.setConfigParams(followerConfig);
1296 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1297 return followerActorContext;
1301 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1302 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1304 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1306 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1308 Follower follower = new Follower(followerActorContext);
1309 followerActor.underlyingActor().setBehavior(follower);
1310 followerActorContext.setCurrentBehavior(follower);
1312 Map<String, String> peerAddresses = new HashMap<>();
1313 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1315 leaderActorContext.setPeerAddresses(peerAddresses);
1317 leaderActorContext.getReplicatedLog().removeFrom(0);
1320 leaderActorContext.setReplicatedLog(
1321 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1323 leaderActorContext.setCommitIndex(1);
1325 followerActorContext.getReplicatedLog().removeFrom(0);
1327 // follower too has the exact same log entries and has the same commit index
1328 followerActorContext.setReplicatedLog(
1329 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1331 followerActorContext.setCommitIndex(1);
1333 leader = new Leader(leaderActorContext);
1334 leaderActorContext.setCurrentBehavior(leader);
1336 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1338 assertEquals(-1, appendEntries.getLeaderCommit());
1339 assertEquals(0, appendEntries.getEntries().size());
1340 assertEquals(0, appendEntries.getPrevLogIndex());
1342 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1343 leaderActor, AppendEntriesReply.class);
1345 assertEquals(2, appendEntriesReply.getLogLastIndex());
1346 assertEquals(1, appendEntriesReply.getLogLastTerm());
1348 // follower returns its next index
1349 assertEquals(2, appendEntriesReply.getLogLastIndex());
1350 assertEquals(1, appendEntriesReply.getLogLastTerm());
1356 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1357 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1359 final MockRaftActorContext leaderActorContext = createActorContext();
1361 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1362 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1364 Follower follower = new Follower(followerActorContext);
1365 followerActor.underlyingActor().setBehavior(follower);
1366 followerActorContext.setCurrentBehavior(follower);
1368 Map<String, String> leaderPeerAddresses = new HashMap<>();
1369 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1371 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1373 leaderActorContext.getReplicatedLog().removeFrom(0);
1375 leaderActorContext.setReplicatedLog(
1376 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1378 leaderActorContext.setCommitIndex(1);
1380 followerActorContext.getReplicatedLog().removeFrom(0);
1382 followerActorContext.setReplicatedLog(
1383 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1385 // follower has the same log entries but its commit index > leaders commit index
1386 followerActorContext.setCommitIndex(2);
1388 leader = new Leader(leaderActorContext);
1390 // Initial heartbeat
1391 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1393 assertEquals(-1, appendEntries.getLeaderCommit());
1394 assertEquals(0, appendEntries.getEntries().size());
1395 assertEquals(0, appendEntries.getPrevLogIndex());
1397 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1398 leaderActor, AppendEntriesReply.class);
1400 assertEquals(2, appendEntriesReply.getLogLastIndex());
1401 assertEquals(1, appendEntriesReply.getLogLastTerm());
1403 leaderActor.underlyingActor().setBehavior(follower);
1404 leader.handleMessage(followerActor, appendEntriesReply);
1406 leaderActor.underlyingActor().clear();
1407 followerActor.underlyingActor().clear();
1409 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1410 TimeUnit.MILLISECONDS);
1412 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1414 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1416 assertEquals(2, appendEntries.getLeaderCommit());
1417 assertEquals(0, appendEntries.getEntries().size());
1418 assertEquals(2, appendEntries.getPrevLogIndex());
1420 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1422 assertEquals(2, appendEntriesReply.getLogLastIndex());
1423 assertEquals(1, appendEntriesReply.getLogLastTerm());
1425 assertEquals(2, followerActorContext.getCommitIndex());
1431 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1432 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1434 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1435 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1436 new FiniteDuration(1000, TimeUnit.SECONDS));
1438 leaderActorContext.setReplicatedLog(
1439 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1440 long leaderCommitIndex = 2;
1441 leaderActorContext.setCommitIndex(leaderCommitIndex);
1442 leaderActorContext.setLastApplied(leaderCommitIndex);
1444 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1445 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1447 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1449 followerActorContext.setReplicatedLog(
1450 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1451 followerActorContext.setCommitIndex(0);
1452 followerActorContext.setLastApplied(0);
1454 Follower follower = new Follower(followerActorContext);
1455 followerActor.underlyingActor().setBehavior(follower);
1457 leader = new Leader(leaderActorContext);
1459 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1460 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1461 AppendEntriesReply.class);
1463 MessageCollectorActor.clearMessages(followerActor);
1464 MessageCollectorActor.clearMessages(leaderActor);
1466 // Verify initial AppendEntries sent.
1467 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1468 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1469 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1471 leaderActor.underlyingActor().setBehavior(leader);
1473 leader.handleMessage(followerActor, appendEntriesReply);
1475 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1476 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1478 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1479 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1480 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1482 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1483 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1484 appendEntries.getEntries().get(0).getData());
1485 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1486 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1487 appendEntries.getEntries().get(1).getData());
1489 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1490 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1492 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1494 ApplyState applyState = applyStateList.get(0);
1495 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1496 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1497 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1498 applyState.getReplicatedLogEntry().getData());
1500 applyState = applyStateList.get(1);
1501 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1502 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1503 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1504 applyState.getReplicatedLogEntry().getData());
1506 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1507 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1511 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1512 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1514 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1515 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1516 new FiniteDuration(1000, TimeUnit.SECONDS));
1518 leaderActorContext.setReplicatedLog(
1519 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1520 long leaderCommitIndex = 1;
1521 leaderActorContext.setCommitIndex(leaderCommitIndex);
1522 leaderActorContext.setLastApplied(leaderCommitIndex);
1524 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1525 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1527 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1529 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1530 followerActorContext.setCommitIndex(-1);
1531 followerActorContext.setLastApplied(-1);
1533 Follower follower = new Follower(followerActorContext);
1534 followerActor.underlyingActor().setBehavior(follower);
1535 followerActorContext.setCurrentBehavior(follower);
1537 leader = new Leader(leaderActorContext);
1539 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1540 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1541 AppendEntriesReply.class);
1543 MessageCollectorActor.clearMessages(followerActor);
1544 MessageCollectorActor.clearMessages(leaderActor);
1546 // Verify initial AppendEntries sent with the leader's current commit index.
1547 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1548 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1549 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1551 leaderActor.underlyingActor().setBehavior(leader);
1552 leaderActorContext.setCurrentBehavior(leader);
1554 leader.handleMessage(followerActor, appendEntriesReply);
1556 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1557 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1559 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1560 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1561 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1563 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1564 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1565 appendEntries.getEntries().get(0).getData());
1566 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1567 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1568 appendEntries.getEntries().get(1).getData());
1570 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1571 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1573 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1575 ApplyState applyState = applyStateList.get(0);
1576 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1577 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1578 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1579 applyState.getReplicatedLogEntry().getData());
1581 applyState = applyStateList.get(1);
1582 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1583 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1584 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1585 applyState.getReplicatedLogEntry().getData());
1587 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1588 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1592 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1593 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1595 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1596 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1597 new FiniteDuration(1000, TimeUnit.SECONDS));
1599 leaderActorContext.setReplicatedLog(
1600 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1601 long leaderCommitIndex = 1;
1602 leaderActorContext.setCommitIndex(leaderCommitIndex);
1603 leaderActorContext.setLastApplied(leaderCommitIndex);
1605 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1606 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1608 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1610 followerActorContext.setReplicatedLog(
1611 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1612 followerActorContext.setCommitIndex(-1);
1613 followerActorContext.setLastApplied(-1);
1615 Follower follower = new Follower(followerActorContext);
1616 followerActor.underlyingActor().setBehavior(follower);
1617 followerActorContext.setCurrentBehavior(follower);
1619 leader = new Leader(leaderActorContext);
1621 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1622 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1623 AppendEntriesReply.class);
1625 MessageCollectorActor.clearMessages(followerActor);
1626 MessageCollectorActor.clearMessages(leaderActor);
1628 // Verify initial AppendEntries sent with the leader's current commit index.
1629 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1630 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1631 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1633 leaderActor.underlyingActor().setBehavior(leader);
1634 leaderActorContext.setCurrentBehavior(leader);
1636 leader.handleMessage(followerActor, appendEntriesReply);
1638 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1639 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1641 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1642 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1643 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1645 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1646 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1647 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1648 appendEntries.getEntries().get(0).getData());
1649 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1650 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1651 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1652 appendEntries.getEntries().get(1).getData());
1654 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1655 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1657 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1659 ApplyState applyState = applyStateList.get(0);
1660 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1661 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1662 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1663 applyState.getReplicatedLogEntry().getData());
1665 applyState = applyStateList.get(1);
1666 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1667 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1668 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1669 applyState.getReplicatedLogEntry().getData());
1671 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1672 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1673 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1677 public void testHandleAppendEntriesReplyWithNewerTerm() {
1678 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1680 MockRaftActorContext leaderActorContext = createActorContext();
1681 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1682 new FiniteDuration(10000, TimeUnit.SECONDS));
1684 leaderActorContext.setReplicatedLog(
1685 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1687 leader = new Leader(leaderActorContext);
1688 leaderActor.underlyingActor().setBehavior(leader);
1689 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1691 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1692 AppendEntriesReply.class);
1694 assertEquals(false, appendEntriesReply.isSuccess());
1695 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1697 MessageCollectorActor.clearMessages(leaderActor);
1701 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1702 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1704 MockRaftActorContext leaderActorContext = createActorContext();
1705 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1706 new FiniteDuration(10000, TimeUnit.SECONDS));
1708 leaderActorContext.setReplicatedLog(
1709 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1710 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1712 leader = new Leader(leaderActorContext);
1713 leaderActor.underlyingActor().setBehavior(leader);
1714 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1716 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1717 AppendEntriesReply.class);
1719 assertEquals(false, appendEntriesReply.isSuccess());
1720 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1722 MessageCollectorActor.clearMessages(leaderActor);
1726 public void testHandleAppendEntriesReplySuccess() {
1727 logStart("testHandleAppendEntriesReplySuccess");
1729 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1731 leaderActorContext.setReplicatedLog(
1732 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1734 leaderActorContext.setCommitIndex(1);
1735 leaderActorContext.setLastApplied(1);
1736 leaderActorContext.getTermInformation().update(1, "leader");
1738 leader = new Leader(leaderActorContext);
1740 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1742 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1743 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1745 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1747 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1749 assertEquals(RaftState.Leader, raftActorBehavior.state());
1751 assertEquals(2, leaderActorContext.getCommitIndex());
1753 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1754 leaderActor, ApplyJournalEntries.class);
1756 assertEquals(2, leaderActorContext.getLastApplied());
1758 assertEquals(2, applyJournalEntries.getToIndex());
1760 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1763 assertEquals(1,applyStateList.size());
1765 ApplyState applyState = applyStateList.get(0);
1767 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1769 assertEquals(2, followerInfo.getMatchIndex());
1770 assertEquals(3, followerInfo.getNextIndex());
1771 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1772 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1776 public void testHandleAppendEntriesReplyUnknownFollower() {
1777 logStart("testHandleAppendEntriesReplyUnknownFollower");
1779 MockRaftActorContext leaderActorContext = createActorContext();
1781 leader = new Leader(leaderActorContext);
1783 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1785 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1787 assertEquals(RaftState.Leader, raftActorBehavior.state());
1791 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1792 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1794 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1795 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1796 new FiniteDuration(1000, TimeUnit.SECONDS));
1797 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1799 leaderActorContext.setReplicatedLog(
1800 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1801 long leaderCommitIndex = 3;
1802 leaderActorContext.setCommitIndex(leaderCommitIndex);
1803 leaderActorContext.setLastApplied(leaderCommitIndex);
1805 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1806 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1807 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1808 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1810 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1812 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1813 followerActorContext.setCommitIndex(-1);
1814 followerActorContext.setLastApplied(-1);
1816 Follower follower = new Follower(followerActorContext);
1817 followerActor.underlyingActor().setBehavior(follower);
1818 followerActorContext.setCurrentBehavior(follower);
1820 leader = new Leader(leaderActorContext);
1822 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1823 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1824 AppendEntriesReply.class);
1826 MessageCollectorActor.clearMessages(followerActor);
1827 MessageCollectorActor.clearMessages(leaderActor);
1829 // Verify initial AppendEntries sent with the leader's current commit index.
1830 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1831 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1832 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1834 leaderActor.underlyingActor().setBehavior(leader);
1835 leaderActorContext.setCurrentBehavior(leader);
1837 leader.handleMessage(followerActor, appendEntriesReply);
1839 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1840 AppendEntries.class, 2);
1841 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1843 appendEntries = appendEntriesList.get(0);
1844 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1846 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1848 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1849 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1850 appendEntries.getEntries().get(0).getData());
1851 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1852 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1853 appendEntries.getEntries().get(1).getData());
1855 appendEntries = appendEntriesList.get(1);
1856 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1857 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1858 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1860 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1861 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1862 appendEntries.getEntries().get(0).getData());
1863 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1864 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1865 appendEntries.getEntries().get(1).getData());
1867 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1868 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1870 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1872 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1873 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1877 public void testHandleRequestVoteReply() {
1878 logStart("testHandleRequestVoteReply");
1880 MockRaftActorContext leaderActorContext = createActorContext();
1882 leader = new Leader(leaderActorContext);
1884 // Should be a no-op.
1885 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1886 new RequestVoteReply(1, true));
1888 assertEquals(RaftState.Leader, raftActorBehavior.state());
1890 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1892 assertEquals(RaftState.Leader, raftActorBehavior.state());
1896 public void testIsolatedLeaderCheckNoFollowers() {
1897 logStart("testIsolatedLeaderCheckNoFollowers");
1899 MockRaftActorContext leaderActorContext = createActorContext();
1901 leader = new Leader(leaderActorContext);
1902 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1903 assertTrue(newBehavior instanceof Leader);
1907 public void testIsolatedLeaderCheckNoVotingFollowers() {
1908 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1910 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1911 Follower follower = new Follower(followerActorContext);
1912 followerActor.underlyingActor().setBehavior(follower);
1914 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1915 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1916 new FiniteDuration(1000, TimeUnit.SECONDS));
1917 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1919 leader = new Leader(leaderActorContext);
1920 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1921 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1922 assertTrue("Expected Leader", newBehavior instanceof Leader);
1925 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1926 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1927 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1929 MockRaftActorContext leaderActorContext = createActorContext();
1931 Map<String, String> peerAddresses = new HashMap<>();
1932 peerAddresses.put("follower-1", followerActor1.path().toString());
1933 peerAddresses.put("follower-2", followerActor2.path().toString());
1935 leaderActorContext.setPeerAddresses(peerAddresses);
1936 leaderActorContext.setRaftPolicy(raftPolicy);
1938 leader = new Leader(leaderActorContext);
1940 leader.markFollowerActive("follower-1");
1941 leader.markFollowerActive("follower-2");
1942 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1945 // kill 1 follower and verify if that got killed
1946 final TestKit probe = new TestKit(getSystem());
1947 probe.watch(followerActor1);
1948 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1949 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1950 assertEquals(termMsg1.getActor(), followerActor1);
1952 leader.markFollowerInActive("follower-1");
1953 leader.markFollowerActive("follower-2");
1954 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1955 assertTrue("Behavior not instance of Leader when majority of followers are active",
1956 newBehavior instanceof Leader);
1958 // kill 2nd follower and leader should change to Isolated leader
1959 followerActor2.tell(PoisonPill.getInstance(), null);
1960 probe.watch(followerActor2);
1961 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1962 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1963 assertEquals(termMsg2.getActor(), followerActor2);
1965 leader.markFollowerInActive("follower-2");
1966 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1970 public void testIsolatedLeaderCheckTwoFollowers() {
1971 logStart("testIsolatedLeaderCheckTwoFollowers");
1973 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1975 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1976 newBehavior instanceof IsolatedLeader);
1980 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1981 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1983 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1985 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1986 newBehavior instanceof Leader);
1990 public void testLaggingFollowerStarvation() {
1991 logStart("testLaggingFollowerStarvation");
1993 String leaderActorId = actorFactory.generateActorId("leader");
1994 String follower1ActorId = actorFactory.generateActorId("follower");
1995 String follower2ActorId = actorFactory.generateActorId("follower");
1997 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1998 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
2000 MockRaftActorContext leaderActorContext =
2001 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2003 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2004 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2005 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2007 leaderActorContext.setConfigParams(configParams);
2009 leaderActorContext.setReplicatedLog(
2010 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2012 Map<String, String> peerAddresses = new HashMap<>();
2013 peerAddresses.put(follower1ActorId,
2014 follower1Actor.path().toString());
2015 peerAddresses.put(follower2ActorId,
2016 follower2Actor.path().toString());
2018 leaderActorContext.setPeerAddresses(peerAddresses);
2019 leaderActorContext.getTermInformation().update(1, leaderActorId);
2021 leader = createBehavior(leaderActorContext);
2023 leaderActor.underlyingActor().setBehavior(leader);
2025 for (int i = 1; i < 6; i++) {
2026 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2027 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2028 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2029 assertTrue(newBehavior == leader);
2030 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2033 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2034 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2036 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2037 heartbeats.size() > 1);
2039 // Check if follower-2 got AppendEntries during this time and was not starved
2040 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2042 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2043 appendEntries.size() > 1);
2047 public void testReplicationConsensusWithNonVotingFollower() {
2048 logStart("testReplicationConsensusWithNonVotingFollower");
2050 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052 new FiniteDuration(1000, TimeUnit.SECONDS));
2054 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055 leaderActorContext.setCommitIndex(-1);
2056 leaderActorContext.setLastApplied(-1);
2058 String nonVotingFollowerId = "nonvoting-follower";
2059 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2060 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2062 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2063 VotingState.NON_VOTING);
2065 leader = new Leader(leaderActorContext);
2066 leaderActorContext.setCurrentBehavior(leader);
2068 // Ignore initial heartbeats
2069 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2072 MessageCollectorActor.clearMessages(followerActor);
2073 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2074 MessageCollectorActor.clearMessages(leaderActor);
2076 // Send a Replicate message and wait for AppendEntries.
2077 sendReplicate(leaderActorContext, 0);
2079 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2080 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2082 // Send reply only from the voting follower and verify consensus via ApplyState.
2083 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2085 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2087 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2089 MessageCollectorActor.clearMessages(followerActor);
2090 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2091 MessageCollectorActor.clearMessages(leaderActor);
2093 // Send another Replicate message
2094 sendReplicate(leaderActorContext, 1);
2096 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2098 AppendEntries.class);
2099 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2100 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2102 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2103 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2105 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2107 // Send reply from the voting follower and verify consensus.
2108 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2110 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2114 public void testTransferLeadershipWithFollowerInSync() {
2115 logStart("testTransferLeadershipWithFollowerInSync");
2117 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2118 leaderActorContext.setLastApplied(-1);
2119 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120 new FiniteDuration(1000, TimeUnit.SECONDS));
2121 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2123 leader = new Leader(leaderActorContext);
2124 leaderActorContext.setCurrentBehavior(leader);
2126 // Initial heartbeat
2127 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129 MessageCollectorActor.clearMessages(followerActor);
2131 sendReplicate(leaderActorContext, 0);
2132 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2135 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2136 MessageCollectorActor.clearMessages(followerActor);
2138 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139 leader.transferLeadership(mockTransferCohort);
2141 verify(mockTransferCohort, never()).transferComplete();
2142 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2146 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2149 // Leader should force an election timeout
2150 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152 verify(mockTransferCohort).transferComplete();
2156 public void testTransferLeadershipWithEmptyLog() {
2157 logStart("testTransferLeadershipWithEmptyLog");
2159 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161 new FiniteDuration(1000, TimeUnit.SECONDS));
2162 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2164 leader = new Leader(leaderActorContext);
2165 leaderActorContext.setCurrentBehavior(leader);
2167 // Initial heartbeat
2168 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170 MessageCollectorActor.clearMessages(followerActor);
2172 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2173 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2174 leader.transferLeadership(mockTransferCohort);
2176 verify(mockTransferCohort, never()).transferComplete();
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2180 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2181 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2183 // Leader should force an election timeout
2184 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2186 verify(mockTransferCohort).transferComplete();
2190 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2191 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2193 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2194 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2195 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2197 leader = new Leader(leaderActorContext);
2198 leaderActorContext.setCurrentBehavior(leader);
2200 // Initial heartbeat
2201 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2202 MessageCollectorActor.clearMessages(followerActor);
2204 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2205 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2206 leader.transferLeadership(mockTransferCohort);
2208 verify(mockTransferCohort, never()).transferComplete();
2210 // Sync up the follower.
2211 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2212 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2213 MessageCollectorActor.clearMessages(followerActor);
2215 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2216 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2217 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2218 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2219 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2221 // Leader should force an election timeout
2222 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2224 verify(mockTransferCohort).transferComplete();
2228 public void testTransferLeadershipWithFollowerSyncTimeout() {
2229 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2231 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2232 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2233 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2234 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2235 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2237 leader = new Leader(leaderActorContext);
2238 leaderActorContext.setCurrentBehavior(leader);
2240 // Initial heartbeat
2241 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2242 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2243 MessageCollectorActor.clearMessages(followerActor);
2245 sendReplicate(leaderActorContext, 0);
2246 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2248 MessageCollectorActor.clearMessages(followerActor);
2250 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2251 leader.transferLeadership(mockTransferCohort);
2253 verify(mockTransferCohort, never()).transferComplete();
2255 // Send heartbeats to time out the transfer.
2256 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2257 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2258 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2259 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2262 verify(mockTransferCohort).abortTransfer();
2263 verify(mockTransferCohort, never()).transferComplete();
2264 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2268 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2269 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2271 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2272 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2273 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2274 final MockRaftActorContext.MockPayload largePayload =
2275 new MockRaftActorContext.MockPayload("large", serializedSize);
2277 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2278 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2279 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2280 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2281 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2282 leaderActorContext.setCommitIndex(-1);
2283 leaderActorContext.setLastApplied(-1);
2285 leader = new Leader(leaderActorContext);
2286 leaderActorContext.setCurrentBehavior(leader);
2288 // Send initial heartbeat reply so follower is marked active
2289 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2290 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2291 MessageCollectorActor.clearMessages(followerActor);
2293 // Send normal payload first to prime commit index.
2294 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2295 sendReplicate(leaderActorContext, term, 0);
2297 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2298 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2299 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2301 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2302 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2303 MessageCollectorActor.clearMessages(followerActor);
2305 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2306 sendReplicate(leaderActorContext, term, 1, largePayload);
2308 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2309 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2310 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2312 final Identifier slicingId = messageSlice.getIdentifier();
2314 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2315 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2316 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2317 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2318 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2319 MessageCollectorActor.clearMessages(followerActor);
2321 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2323 // Sleep for the heartbeat interval so AppendEntries is sent.
2324 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2325 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2327 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2329 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2330 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2331 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2332 MessageCollectorActor.clearMessages(followerActor);
2334 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2336 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2337 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2338 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2340 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2342 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2344 MessageCollectorActor.clearMessages(followerActor);
2346 // Send another normal payload.
2348 sendReplicate(leaderActorContext, term, 2);
2350 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2352 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2353 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2357 public void testLargePayloadSlicingExpiration() {
2358 logStart("testLargePayloadSlicingExpiration");
2360 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2361 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2362 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2363 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2364 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2365 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2366 leaderActorContext.setCommitIndex(-1);
2367 leaderActorContext.setLastApplied(-1);
2369 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2370 leader = new Leader(leaderActorContext);
2371 leaderActorContext.setCurrentBehavior(leader);
2373 // Send initial heartbeat reply so follower is marked active
2374 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2375 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2376 MessageCollectorActor.clearMessages(followerActor);
2378 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2379 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2380 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2382 // Sleep for at least 3 * election timeout so the slicing state expires.
2383 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2384 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2385 MessageCollectorActor.clearMessages(followerActor);
2387 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2389 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2390 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2391 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2393 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2394 MessageCollectorActor.clearMessages(followerActor);
2396 // Send an AppendEntriesReply - this should restart the slicing.
2398 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2399 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2401 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2403 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2407 public void testLeaderAddressInAppendEntries() {
2408 logStart("testLeaderAddressInAppendEntries");
2410 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2411 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2412 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2413 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2414 leaderActorContext.setCommitIndex(-1);
2415 leaderActorContext.setLastApplied(-1);
2417 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2418 peerId -> leaderActor.path().toString());
2420 leader = new Leader(leaderActorContext);
2421 leaderActorContext.setCurrentBehavior(leader);
2423 // Initial heartbeat shouldn't have the leader address
2425 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2426 assertFalse(appendEntries.getLeaderAddress().isPresent());
2427 MessageCollectorActor.clearMessages(followerActor);
2429 // Send AppendEntriesReply indicating the follower needs the leader address
2431 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2432 RaftVersions.CURRENT_VERSION));
2434 // Sleep for the heartbeat interval so AppendEntries is sent.
2435 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2436 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2438 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2440 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2441 assertTrue(appendEntries.getLeaderAddress().isPresent());
2442 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2443 MessageCollectorActor.clearMessages(followerActor);
2445 // Send AppendEntriesReply indicating the follower does not need the leader address
2447 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2448 RaftVersions.CURRENT_VERSION));
2450 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2451 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2453 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2455 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2456 assertFalse(appendEntries.getLeaderAddress().isPresent());
2460 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2461 final ActorRef actorRef, final RaftRPC rpc) {
2462 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2463 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2466 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2468 private final long electionTimeOutIntervalMillis;
2469 private final int snapshotChunkSize;
2471 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2472 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2473 this.snapshotChunkSize = snapshotChunkSize;
2477 public FiniteDuration getElectionTimeOutInterval() {
2478 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2482 public int getSnapshotChunkSize() {
2483 return snapshotChunkSize;