2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
75 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
84 static final String FOLLOWER_ID = "follower";
85 public static final String LEADER_ID = "leader";
87 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
93 private Leader leader;
94 private final short payloadVersion = 5;
98 public void tearDown() {
107 public void testHandleMessageForUnknownMessage() {
108 logStart("testHandleMessageForUnknownMessage");
110 leader = new Leader(createActorContext());
112 // handle message should null when it receives an unknown message
113 assertNull(leader.handleMessage(followerActor, "foo"));
117 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
120 MockRaftActorContext actorContext = createActorContextWithFollower();
121 actorContext.setCommitIndex(-1);
122 actorContext.setPayloadVersion(payloadVersion);
125 actorContext.getTermInformation().update(term, "");
127 leader = new Leader(actorContext);
128 actorContext.setCurrentBehavior(leader);
130 // Leader should send an immediate heartbeat with no entries as follower is inactive.
131 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133 assertEquals("getTerm", term, appendEntries.getTerm());
134 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136 assertEquals("Entries size", 0, appendEntries.getEntries().size());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139 // The follower would normally reply - simulate that explicitly here.
140 leader.handleMessage(followerActor, new AppendEntriesReply(
141 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
144 followerActor.underlyingActor().clear();
146 // Sleep for the heartbeat interval so AppendEntries is sent.
147 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
150 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
152 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155 assertEquals("Entries size", 1, appendEntries.getEntries().size());
156 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
162 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163 return sendReplicate(actorContext, 1, index);
166 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
167 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
170 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
171 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172 actorContext.getReplicatedLog().append(newEntry);
173 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
177 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
180 MockRaftActorContext actorContext = createActorContextWithFollower();
183 actorContext.getTermInformation().update(term, "");
185 leader = new Leader(actorContext);
187 // Leader will send an immediate heartbeat - ignore it.
188 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 // The follower would normally reply - simulate that explicitly here.
191 long lastIndex = actorContext.getReplicatedLog().lastIndex();
192 leader.handleMessage(followerActor, new AppendEntriesReply(
193 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196 followerActor.underlyingActor().clear();
198 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
200 // State should not change
201 assertTrue(raftBehavior instanceof Leader);
203 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206 assertEquals("Entries size", 1, appendEntries.getEntries().size());
207 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
214 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
217 MockRaftActorContext actorContext = createActorContextWithFollower();
218 actorContext.setCommitIndex(-1);
219 actorContext.setLastApplied(-1);
221 // The raft context is initialized with a couple log entries. However the commitIndex
222 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223 // committed and applied. Now it regains leadership with a higher term (2).
224 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225 long newTerm = prevTerm + 1;
226 actorContext.getTermInformation().update(newTerm, "");
228 leader = new Leader(actorContext);
229 actorContext.setCurrentBehavior(leader);
231 // Leader will send an immediate heartbeat - ignore it.
232 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 // The follower replies with the leader's current last index and term, simulating that it is
235 // up to date with the leader.
236 long lastIndex = actorContext.getReplicatedLog().lastIndex();
237 leader.handleMessage(followerActor, new AppendEntriesReply(
238 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
240 // The commit index should not get updated even though consensus was reached. This is b/c the
241 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
242 // from previous terms by counting replicas".
243 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
245 followerActor.underlyingActor().clear();
247 // Now replicate a new entry with the new term 2.
248 long newIndex = lastIndex + 1;
249 sendReplicate(actorContext, newTerm, newIndex);
251 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254 assertEquals("Entries size", 1, appendEntries.getEntries().size());
255 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
259 // The follower replies with success. The leader should now update the commit index to the new index
260 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
261 // prior entries are committed indirectly".
262 leader.handleMessage(followerActor, new AppendEntriesReply(
263 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
265 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
269 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
272 MockRaftActorContext actorContext = createActorContextWithFollower();
273 actorContext.setRaftPolicy(createRaftPolicy(true, true));
276 actorContext.getTermInformation().update(term, "");
278 leader = new Leader(actorContext);
280 // Leader will send an immediate heartbeat - ignore it.
281 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 // The follower would normally reply - simulate that explicitly here.
284 long lastIndex = actorContext.getReplicatedLog().lastIndex();
285 leader.handleMessage(followerActor, new AppendEntriesReply(
286 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289 followerActor.underlyingActor().clear();
291 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
293 // State should not change
294 assertTrue(raftBehavior instanceof Leader);
296 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299 assertEquals("Entries size", 1, appendEntries.getEntries().size());
300 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
307 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
310 MockRaftActorContext actorContext = createActorContextWithFollower();
311 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313 public FiniteDuration getHeartBeatInterval() {
314 return FiniteDuration.apply(5, TimeUnit.SECONDS);
319 actorContext.getTermInformation().update(term, "");
321 leader = new Leader(actorContext);
323 // Leader will send an immediate heartbeat - ignore it.
324 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326 // The follower would normally reply - simulate that explicitly here.
327 long lastIndex = actorContext.getReplicatedLog().lastIndex();
328 leader.handleMessage(followerActor, new AppendEntriesReply(
329 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332 followerActor.underlyingActor().clear();
334 for (int i = 0; i < 5; i++) {
335 sendReplicate(actorContext, lastIndex + i + 1);
338 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339 // We expect only 1 message to be sent because of two reasons,
340 // - an append entries reply was not received
341 // - the heartbeat interval has not expired
342 // In this scenario if multiple messages are sent they would likely be duplicates
343 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
347 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
350 MockRaftActorContext actorContext = createActorContextWithFollower();
351 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
353 public FiniteDuration getHeartBeatInterval() {
354 return FiniteDuration.apply(5, TimeUnit.SECONDS);
359 actorContext.getTermInformation().update(term, "");
361 leader = new Leader(actorContext);
363 // Leader will send an immediate heartbeat - ignore it.
364 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
366 // The follower would normally reply - simulate that explicitly here.
367 long lastIndex = actorContext.getReplicatedLog().lastIndex();
368 leader.handleMessage(followerActor, new AppendEntriesReply(
369 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
372 followerActor.underlyingActor().clear();
374 for (int i = 0; i < 3; i++) {
375 sendReplicate(actorContext, lastIndex + i + 1);
376 leader.handleMessage(followerActor, new AppendEntriesReply(
377 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
381 for (int i = 3; i < 5; i++) {
382 sendReplicate(actorContext, lastIndex + i + 1);
385 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
386 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
387 // get sent to the follower - but not the 5th
388 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
390 for (int i = 0; i < 4; i++) {
391 long expected = allMessages.get(i).getEntries().get(0).getIndex();
392 assertEquals(expected, i + 2);
397 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
398 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
400 MockRaftActorContext actorContext = createActorContextWithFollower();
401 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
403 public FiniteDuration getHeartBeatInterval() {
404 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
409 actorContext.getTermInformation().update(term, "");
411 leader = new Leader(actorContext);
413 // Leader will send an immediate heartbeat - ignore it.
414 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
416 // The follower would normally reply - simulate that explicitly here.
417 long lastIndex = actorContext.getReplicatedLog().lastIndex();
418 leader.handleMessage(followerActor, new AppendEntriesReply(
419 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
422 followerActor.underlyingActor().clear();
424 sendReplicate(actorContext, lastIndex + 1);
426 // Wait slightly longer than heartbeat duration
427 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
429 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
431 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
432 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
434 assertEquals(1, allMessages.get(0).getEntries().size());
435 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
436 assertEquals(1, allMessages.get(1).getEntries().size());
437 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
442 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
443 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
445 MockRaftActorContext actorContext = createActorContextWithFollower();
446 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
448 public FiniteDuration getHeartBeatInterval() {
449 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
454 actorContext.getTermInformation().update(term, "");
456 leader = new Leader(actorContext);
458 // Leader will send an immediate heartbeat - ignore it.
459 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
461 // The follower would normally reply - simulate that explicitly here.
462 long lastIndex = actorContext.getReplicatedLog().lastIndex();
463 leader.handleMessage(followerActor, new AppendEntriesReply(
464 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
465 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
467 followerActor.underlyingActor().clear();
469 for (int i = 0; i < 3; i++) {
470 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
471 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
474 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
475 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
479 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
480 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
482 MockRaftActorContext actorContext = createActorContextWithFollower();
483 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
485 public FiniteDuration getHeartBeatInterval() {
486 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
491 actorContext.getTermInformation().update(term, "");
493 leader = new Leader(actorContext);
495 // Leader will send an immediate heartbeat - ignore it.
496 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
498 // The follower would normally reply - simulate that explicitly here.
499 long lastIndex = actorContext.getReplicatedLog().lastIndex();
500 leader.handleMessage(followerActor, new AppendEntriesReply(
501 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
502 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
504 followerActor.underlyingActor().clear();
506 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
507 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
508 sendReplicate(actorContext, lastIndex + 1);
510 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
511 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
513 assertEquals(0, allMessages.get(0).getEntries().size());
514 assertEquals(1, allMessages.get(1).getEntries().size());
519 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
520 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
522 MockRaftActorContext actorContext = createActorContext();
524 leader = new Leader(actorContext);
526 actorContext.setLastApplied(0);
528 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
529 long term = actorContext.getTermInformation().getCurrentTerm();
530 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
531 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
533 actorContext.getReplicatedLog().append(newEntry);
535 final Identifier id = new MockIdentifier("state-id");
536 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
537 new Replicate(leaderActor, id, newEntry, true));
539 // State should not change
540 assertTrue(raftBehavior instanceof Leader);
542 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
544 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
545 // one since lastApplied state is 0.
546 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
547 leaderActor, ApplyState.class);
548 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
550 for (int i = 0; i <= newLogIndex - 1; i++) {
551 ApplyState applyState = applyStateList.get(i);
552 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
553 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
556 ApplyState last = applyStateList.get((int) newLogIndex - 1);
557 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
558 assertEquals("getIdentifier", id, last.getIdentifier());
562 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
563 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
565 final MockRaftActorContext actorContext = createActorContextWithFollower();
567 Map<String, String> leadersSnapshot = new HashMap<>();
568 leadersSnapshot.put("1", "A");
569 leadersSnapshot.put("2", "B");
570 leadersSnapshot.put("3", "C");
573 actorContext.getReplicatedLog().removeFrom(0);
575 final int commitIndex = 3;
576 final int snapshotIndex = 2;
577 final int snapshotTerm = 1;
579 // set the snapshot variables in replicatedlog
580 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
581 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
582 actorContext.setCommitIndex(commitIndex);
583 //set follower timeout to 2 mins, helps during debugging
584 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
586 leader = new Leader(actorContext);
588 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
589 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
591 //update follower timestamp
592 leader.markFollowerActive(FOLLOWER_ID);
594 ByteString bs = toByteString(leadersSnapshot);
595 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
596 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
597 -1, null, null), ByteSource.wrap(bs.toByteArray())));
598 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
599 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
600 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
601 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
603 //send first chunk and no InstallSnapshotReply received yet
605 fts.incrementChunkIndex();
607 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608 TimeUnit.MILLISECONDS);
610 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
612 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
614 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
616 //InstallSnapshotReply received
617 fts.markSendStatus(true);
619 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
621 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
623 assertEquals(commitIndex, is.getLastIncludedIndex());
627 public void testSendAppendEntriesSnapshotScenario() {
628 logStart("testSendAppendEntriesSnapshotScenario");
630 final MockRaftActorContext actorContext = createActorContextWithFollower();
632 Map<String, String> leadersSnapshot = new HashMap<>();
633 leadersSnapshot.put("1", "A");
634 leadersSnapshot.put("2", "B");
635 leadersSnapshot.put("3", "C");
638 actorContext.getReplicatedLog().removeFrom(0);
640 final int followersLastIndex = 2;
641 final int snapshotIndex = 3;
642 final int newEntryIndex = 4;
643 final int snapshotTerm = 1;
644 final int currentTerm = 2;
646 // set the snapshot variables in replicatedlog
647 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649 actorContext.setCommitIndex(followersLastIndex);
651 leader = new Leader(actorContext);
653 // Leader will send an immediate heartbeat - ignore it.
654 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
657 SimpleReplicatedLogEntry entry =
658 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
659 new MockRaftActorContext.MockPayload("D"));
661 actorContext.getReplicatedLog().append(entry);
663 //update follower timestamp
664 leader.markFollowerActive(FOLLOWER_ID);
666 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
667 RaftActorBehavior raftBehavior = leader.handleMessage(
668 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
670 assertTrue(raftBehavior instanceof Leader);
672 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
676 public void testInitiateInstallSnapshot() {
677 logStart("testInitiateInstallSnapshot");
679 MockRaftActorContext actorContext = createActorContextWithFollower();
682 actorContext.getReplicatedLog().removeFrom(0);
684 final int followersLastIndex = 2;
685 final int snapshotIndex = 3;
686 final int newEntryIndex = 4;
687 final int snapshotTerm = 1;
688 final int currentTerm = 2;
690 // set the snapshot variables in replicatedlog
691 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693 actorContext.setLastApplied(3);
694 actorContext.setCommitIndex(followersLastIndex);
696 leader = new Leader(actorContext);
698 // Leader will send an immediate heartbeat - ignore it.
699 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
701 // set the snapshot as absent and check if capture-snapshot is invoked.
702 leader.setSnapshotHolder(null);
705 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
706 new MockRaftActorContext.MockPayload("D"));
708 actorContext.getReplicatedLog().append(entry);
710 //update follower timestamp
711 leader.markFollowerActive(FOLLOWER_ID);
713 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
715 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
717 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
719 assertEquals(3, cs.getLastAppliedIndex());
720 assertEquals(1, cs.getLastAppliedTerm());
721 assertEquals(4, cs.getLastIndex());
722 assertEquals(2, cs.getLastTerm());
724 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
725 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
727 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
731 public void testInitiateForceInstallSnapshot() throws Exception {
732 logStart("testInitiateForceInstallSnapshot");
734 MockRaftActorContext actorContext = createActorContextWithFollower();
736 final int followersLastIndex = 2;
737 final int snapshotIndex = -1;
738 final int newEntryIndex = 4;
739 final int snapshotTerm = -1;
740 final int currentTerm = 2;
742 // set the snapshot variables in replicatedlog
743 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745 actorContext.setLastApplied(3);
746 actorContext.setCommitIndex(followersLastIndex);
748 actorContext.getReplicatedLog().removeFrom(0);
750 AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
751 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
753 leader = new Leader(actorContext);
754 actorContext.setCurrentBehavior(leader);
756 // Leader will send an immediate heartbeat - ignore it.
757 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
759 // set the snapshot as absent and check if capture-snapshot is invoked.
760 leader.setSnapshotHolder(null);
762 for (int i = 0; i < 4; i++) {
763 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
764 new MockRaftActorContext.MockPayload("X" + i)));
768 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
769 new MockRaftActorContext.MockPayload("D"));
771 actorContext.getReplicatedLog().append(entry);
773 //update follower timestamp
774 leader.markFollowerActive(FOLLOWER_ID);
776 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
777 // installed with a SendInstallSnapshot
778 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
779 RaftVersions.CURRENT_VERSION));
781 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
783 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
784 assertEquals(3, cs.getLastAppliedIndex());
785 assertEquals(1, cs.getLastAppliedTerm());
786 assertEquals(4, cs.getLastIndex());
787 assertEquals(2, cs.getLastTerm());
789 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
790 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
792 MessageCollectorActor.clearMessages(followerActor);
794 // Sending Replicate message should not initiate another capture since the first is in progress.
795 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
796 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
798 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
799 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
800 RaftVersions.CURRENT_VERSION));
801 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
803 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
804 final byte[] bytes = new byte[]{1, 2, 3};
805 installSnapshotStream.get().get().write(bytes);
806 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
807 Runtime.getRuntime().totalMemory());
808 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
810 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
811 MessageCollectorActor.clearMessages(followerActor);
812 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
813 RaftVersions.CURRENT_VERSION));
814 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
819 public void testInstallSnapshot() {
820 logStart("testInstallSnapshot");
822 final MockRaftActorContext actorContext = createActorContextWithFollower();
824 Map<String, String> leadersSnapshot = new HashMap<>();
825 leadersSnapshot.put("1", "A");
826 leadersSnapshot.put("2", "B");
827 leadersSnapshot.put("3", "C");
830 actorContext.getReplicatedLog().removeFrom(0);
832 final int lastAppliedIndex = 3;
833 final int snapshotIndex = 2;
834 final int snapshotTerm = 1;
835 final int currentTerm = 2;
837 // set the snapshot variables in replicatedlog
838 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
839 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
840 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
841 actorContext.setCommitIndex(lastAppliedIndex);
842 actorContext.setLastApplied(lastAppliedIndex);
844 leader = new Leader(actorContext);
846 // Initial heartbeat.
847 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
849 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
850 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
852 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
853 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
854 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
856 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
857 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
859 assertTrue(raftBehavior instanceof Leader);
861 // check if installsnapshot gets called with the correct values.
863 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
864 InstallSnapshot.class);
866 assertNotNull(installSnapshot.getData());
867 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
868 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
870 assertEquals(currentTerm, installSnapshot.getTerm());
874 public void testForceInstallSnapshot() {
875 logStart("testForceInstallSnapshot");
877 final MockRaftActorContext actorContext = createActorContextWithFollower();
879 Map<String, String> leadersSnapshot = new HashMap<>();
880 leadersSnapshot.put("1", "A");
881 leadersSnapshot.put("2", "B");
882 leadersSnapshot.put("3", "C");
884 final int lastAppliedIndex = 3;
885 final int snapshotIndex = -1;
886 final int snapshotTerm = -1;
887 final int currentTerm = 2;
889 // set the snapshot variables in replicatedlog
890 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
891 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
892 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
893 actorContext.setCommitIndex(lastAppliedIndex);
894 actorContext.setLastApplied(lastAppliedIndex);
896 leader = new Leader(actorContext);
898 // Initial heartbeat.
899 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
901 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
904 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
905 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
906 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
908 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
909 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
911 assertTrue(raftBehavior instanceof Leader);
913 // check if installsnapshot gets called with the correct values.
915 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
916 InstallSnapshot.class);
918 assertNotNull(installSnapshot.getData());
919 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
920 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
922 assertEquals(currentTerm, installSnapshot.getTerm());
926 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
927 logStart("testHandleInstallSnapshotReplyLastChunk");
929 MockRaftActorContext actorContext = createActorContextWithFollower();
931 final int commitIndex = 3;
932 final int snapshotIndex = 2;
933 final int snapshotTerm = 1;
934 final int currentTerm = 2;
936 actorContext.setCommitIndex(commitIndex);
938 leader = new Leader(actorContext);
939 actorContext.setCurrentBehavior(leader);
941 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
942 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
944 // Ignore initial heartbeat.
945 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
947 Map<String, String> leadersSnapshot = new HashMap<>();
948 leadersSnapshot.put("1", "A");
949 leadersSnapshot.put("2", "B");
950 leadersSnapshot.put("3", "C");
952 // set the snapshot variables in replicatedlog
954 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
955 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
956 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
958 ByteString bs = toByteString(leadersSnapshot);
959 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
960 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
961 -1, null, null), ByteSource.wrap(bs.toByteArray())));
962 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
963 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
964 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
965 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
966 while (!fts.isLastChunk(fts.getChunkIndex())) {
968 fts.incrementChunkIndex();
972 actorContext.getReplicatedLog().removeFrom(0);
974 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
975 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
977 assertTrue(raftBehavior instanceof Leader);
979 assertEquals(1, leader.followerLogSize());
980 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
982 assertNull(fli.getInstallSnapshotState());
983 assertEquals(commitIndex, fli.getMatchIndex());
984 assertEquals(commitIndex + 1, fli.getNextIndex());
985 assertFalse(leader.hasSnapshot());
989 public void testSendSnapshotfromInstallSnapshotReply() {
990 logStart("testSendSnapshotfromInstallSnapshotReply");
992 MockRaftActorContext actorContext = createActorContextWithFollower();
994 final int commitIndex = 3;
995 final int snapshotIndex = 2;
996 final int snapshotTerm = 1;
997 final int currentTerm = 2;
999 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1001 public int getSnapshotChunkSize() {
1005 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1006 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1008 actorContext.setConfigParams(configParams);
1009 actorContext.setCommitIndex(commitIndex);
1011 leader = new Leader(actorContext);
1012 actorContext.setCurrentBehavior(leader);
1014 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1015 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1017 Map<String, String> leadersSnapshot = new HashMap<>();
1018 leadersSnapshot.put("1", "A");
1019 leadersSnapshot.put("2", "B");
1020 leadersSnapshot.put("3", "C");
1022 // set the snapshot variables in replicatedlog
1023 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1024 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1025 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1027 ByteString bs = toByteString(leadersSnapshot);
1028 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1029 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1032 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1034 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1035 InstallSnapshot.class);
1037 assertEquals(1, installSnapshot.getChunkIndex());
1038 assertEquals(3, installSnapshot.getTotalChunks());
1040 followerActor.underlyingActor().clear();
1041 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1042 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1044 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1046 assertEquals(2, installSnapshot.getChunkIndex());
1047 assertEquals(3, installSnapshot.getTotalChunks());
1049 followerActor.underlyingActor().clear();
1050 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1051 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1053 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1055 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1056 followerActor.underlyingActor().clear();
1057 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1058 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1060 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1062 assertNull(installSnapshot);
1067 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1068 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1070 MockRaftActorContext actorContext = createActorContextWithFollower();
1072 final int commitIndex = 3;
1073 final int snapshotIndex = 2;
1074 final int snapshotTerm = 1;
1075 final int currentTerm = 2;
1077 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1079 public int getSnapshotChunkSize() {
1084 actorContext.setCommitIndex(commitIndex);
1086 leader = new Leader(actorContext);
1088 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1089 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1091 Map<String, String> leadersSnapshot = new HashMap<>();
1092 leadersSnapshot.put("1", "A");
1093 leadersSnapshot.put("2", "B");
1094 leadersSnapshot.put("3", "C");
1096 // set the snapshot variables in replicatedlog
1097 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1098 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1099 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1101 ByteString bs = toByteString(leadersSnapshot);
1102 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1103 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1106 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1107 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1109 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1110 InstallSnapshot.class);
1112 assertEquals(1, installSnapshot.getChunkIndex());
1113 assertEquals(3, installSnapshot.getTotalChunks());
1115 followerActor.underlyingActor().clear();
1117 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1118 FOLLOWER_ID, -1, false));
1120 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1121 TimeUnit.MILLISECONDS);
1123 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1125 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1127 assertEquals(1, installSnapshot.getChunkIndex());
1128 assertEquals(3, installSnapshot.getTotalChunks());
1132 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1133 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1135 MockRaftActorContext actorContext = createActorContextWithFollower();
1137 final int commitIndex = 3;
1138 final int snapshotIndex = 2;
1139 final int snapshotTerm = 1;
1140 final int currentTerm = 2;
1142 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1144 public int getSnapshotChunkSize() {
1149 actorContext.setCommitIndex(commitIndex);
1151 leader = new Leader(actorContext);
1153 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1154 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1156 Map<String, String> leadersSnapshot = new HashMap<>();
1157 leadersSnapshot.put("1", "A");
1158 leadersSnapshot.put("2", "B");
1159 leadersSnapshot.put("3", "C");
1161 // set the snapshot variables in replicatedlog
1162 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1163 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1164 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1166 ByteString bs = toByteString(leadersSnapshot);
1167 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1168 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1171 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1173 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1174 InstallSnapshot.class);
1176 assertEquals(1, installSnapshot.getChunkIndex());
1177 assertEquals(3, installSnapshot.getTotalChunks());
1178 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1179 installSnapshot.getLastChunkHashCode().get().intValue());
1181 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1183 followerActor.underlyingActor().clear();
1185 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1186 FOLLOWER_ID, 1, true));
1188 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1190 assertEquals(2, installSnapshot.getChunkIndex());
1191 assertEquals(3, installSnapshot.getTotalChunks());
1192 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1196 public void testLeaderInstallSnapshotState() throws IOException {
1197 logStart("testLeaderInstallSnapshotState");
1199 Map<String, String> leadersSnapshot = new HashMap<>();
1200 leadersSnapshot.put("1", "A");
1201 leadersSnapshot.put("2", "B");
1202 leadersSnapshot.put("3", "C");
1204 ByteString bs = toByteString(leadersSnapshot);
1205 byte[] barray = bs.toByteArray();
1207 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1208 fts.setSnapshotBytes(ByteSource.wrap(barray));
1210 assertEquals(bs.size(), barray.length);
1213 for (int i = 0; i < barray.length; i = i + 50) {
1214 int length = i + 50;
1217 if (i + 50 > barray.length) {
1218 length = barray.length;
1221 byte[] chunk = fts.getNextChunk();
1222 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1223 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1225 fts.markSendStatus(true);
1226 if (!fts.isLastChunk(chunkIndex)) {
1227 fts.incrementChunkIndex();
1231 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1236 protected Leader createBehavior(final RaftActorContext actorContext) {
1237 return new Leader(actorContext);
1241 protected MockRaftActorContext createActorContext() {
1242 return createActorContext(leaderActor);
1246 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1247 return createActorContext(LEADER_ID, actorRef);
1250 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1251 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1252 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1253 configParams.setElectionTimeoutFactor(100000);
1254 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1255 context.setConfigParams(configParams);
1256 context.setPayloadVersion(payloadVersion);
1260 private MockRaftActorContext createActorContextWithFollower() {
1261 MockRaftActorContext actorContext = createActorContext();
1262 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1263 followerActor.path().toString()).build());
1264 return actorContext;
1267 private MockRaftActorContext createFollowerActorContextWithLeader() {
1268 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1269 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1270 followerConfig.setElectionTimeoutFactor(10000);
1271 followerActorContext.setConfigParams(followerConfig);
1272 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1273 return followerActorContext;
1277 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1278 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1280 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1282 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1284 Follower follower = new Follower(followerActorContext);
1285 followerActor.underlyingActor().setBehavior(follower);
1286 followerActorContext.setCurrentBehavior(follower);
1288 Map<String, String> peerAddresses = new HashMap<>();
1289 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1291 leaderActorContext.setPeerAddresses(peerAddresses);
1293 leaderActorContext.getReplicatedLog().removeFrom(0);
1296 leaderActorContext.setReplicatedLog(
1297 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1299 leaderActorContext.setCommitIndex(1);
1301 followerActorContext.getReplicatedLog().removeFrom(0);
1303 // follower too has the exact same log entries and has the same commit index
1304 followerActorContext.setReplicatedLog(
1305 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1307 followerActorContext.setCommitIndex(1);
1309 leader = new Leader(leaderActorContext);
1310 leaderActorContext.setCurrentBehavior(leader);
1312 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1314 assertEquals(-1, appendEntries.getLeaderCommit());
1315 assertEquals(0, appendEntries.getEntries().size());
1316 assertEquals(0, appendEntries.getPrevLogIndex());
1318 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1319 leaderActor, AppendEntriesReply.class);
1321 assertEquals(2, appendEntriesReply.getLogLastIndex());
1322 assertEquals(1, appendEntriesReply.getLogLastTerm());
1324 // follower returns its next index
1325 assertEquals(2, appendEntriesReply.getLogLastIndex());
1326 assertEquals(1, appendEntriesReply.getLogLastTerm());
1332 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1333 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1335 final MockRaftActorContext leaderActorContext = createActorContext();
1337 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1338 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1340 Follower follower = new Follower(followerActorContext);
1341 followerActor.underlyingActor().setBehavior(follower);
1342 followerActorContext.setCurrentBehavior(follower);
1344 Map<String, String> leaderPeerAddresses = new HashMap<>();
1345 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1347 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1349 leaderActorContext.getReplicatedLog().removeFrom(0);
1351 leaderActorContext.setReplicatedLog(
1352 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1354 leaderActorContext.setCommitIndex(1);
1356 followerActorContext.getReplicatedLog().removeFrom(0);
1358 followerActorContext.setReplicatedLog(
1359 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1361 // follower has the same log entries but its commit index > leaders commit index
1362 followerActorContext.setCommitIndex(2);
1364 leader = new Leader(leaderActorContext);
1366 // Initial heartbeat
1367 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1369 assertEquals(-1, appendEntries.getLeaderCommit());
1370 assertEquals(0, appendEntries.getEntries().size());
1371 assertEquals(0, appendEntries.getPrevLogIndex());
1373 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1374 leaderActor, AppendEntriesReply.class);
1376 assertEquals(2, appendEntriesReply.getLogLastIndex());
1377 assertEquals(1, appendEntriesReply.getLogLastTerm());
1379 leaderActor.underlyingActor().setBehavior(follower);
1380 leader.handleMessage(followerActor, appendEntriesReply);
1382 leaderActor.underlyingActor().clear();
1383 followerActor.underlyingActor().clear();
1385 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1386 TimeUnit.MILLISECONDS);
1388 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1390 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392 assertEquals(2, appendEntries.getLeaderCommit());
1393 assertEquals(0, appendEntries.getEntries().size());
1394 assertEquals(2, appendEntries.getPrevLogIndex());
1396 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1398 assertEquals(2, appendEntriesReply.getLogLastIndex());
1399 assertEquals(1, appendEntriesReply.getLogLastTerm());
1401 assertEquals(2, followerActorContext.getCommitIndex());
1407 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1408 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1410 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1411 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1412 new FiniteDuration(1000, TimeUnit.SECONDS));
1414 leaderActorContext.setReplicatedLog(
1415 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1416 long leaderCommitIndex = 2;
1417 leaderActorContext.setCommitIndex(leaderCommitIndex);
1418 leaderActorContext.setLastApplied(leaderCommitIndex);
1420 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1421 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1423 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1425 followerActorContext.setReplicatedLog(
1426 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1427 followerActorContext.setCommitIndex(0);
1428 followerActorContext.setLastApplied(0);
1430 Follower follower = new Follower(followerActorContext);
1431 followerActor.underlyingActor().setBehavior(follower);
1433 leader = new Leader(leaderActorContext);
1435 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1436 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1437 AppendEntriesReply.class);
1439 MessageCollectorActor.clearMessages(followerActor);
1440 MessageCollectorActor.clearMessages(leaderActor);
1442 // Verify initial AppendEntries sent.
1443 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1444 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1445 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1447 leaderActor.underlyingActor().setBehavior(leader);
1449 leader.handleMessage(followerActor, appendEntriesReply);
1451 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1452 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1454 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1455 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1456 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1458 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1459 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1460 appendEntries.getEntries().get(0).getData());
1461 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1462 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1463 appendEntries.getEntries().get(1).getData());
1465 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1466 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1468 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1470 ApplyState applyState = applyStateList.get(0);
1471 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1472 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1473 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1474 applyState.getReplicatedLogEntry().getData());
1476 applyState = applyStateList.get(1);
1477 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1478 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1479 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1480 applyState.getReplicatedLogEntry().getData());
1482 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1483 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1487 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1488 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1490 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1491 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1492 new FiniteDuration(1000, TimeUnit.SECONDS));
1494 leaderActorContext.setReplicatedLog(
1495 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1496 long leaderCommitIndex = 1;
1497 leaderActorContext.setCommitIndex(leaderCommitIndex);
1498 leaderActorContext.setLastApplied(leaderCommitIndex);
1500 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1501 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1503 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1505 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1506 followerActorContext.setCommitIndex(-1);
1507 followerActorContext.setLastApplied(-1);
1509 Follower follower = new Follower(followerActorContext);
1510 followerActor.underlyingActor().setBehavior(follower);
1511 followerActorContext.setCurrentBehavior(follower);
1513 leader = new Leader(leaderActorContext);
1515 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1516 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1517 AppendEntriesReply.class);
1519 MessageCollectorActor.clearMessages(followerActor);
1520 MessageCollectorActor.clearMessages(leaderActor);
1522 // Verify initial AppendEntries sent with the leader's current commit index.
1523 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1524 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1525 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1527 leaderActor.underlyingActor().setBehavior(leader);
1528 leaderActorContext.setCurrentBehavior(leader);
1530 leader.handleMessage(followerActor, appendEntriesReply);
1532 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1533 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1535 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1536 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1537 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1539 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1540 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1541 appendEntries.getEntries().get(0).getData());
1542 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1543 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1544 appendEntries.getEntries().get(1).getData());
1546 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1547 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1549 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1551 ApplyState applyState = applyStateList.get(0);
1552 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1553 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1554 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1555 applyState.getReplicatedLogEntry().getData());
1557 applyState = applyStateList.get(1);
1558 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1559 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1560 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1561 applyState.getReplicatedLogEntry().getData());
1563 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1564 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1568 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1569 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1571 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1572 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1573 new FiniteDuration(1000, TimeUnit.SECONDS));
1575 leaderActorContext.setReplicatedLog(
1576 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1577 long leaderCommitIndex = 1;
1578 leaderActorContext.setCommitIndex(leaderCommitIndex);
1579 leaderActorContext.setLastApplied(leaderCommitIndex);
1581 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1582 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1584 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1586 followerActorContext.setReplicatedLog(
1587 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1588 followerActorContext.setCommitIndex(-1);
1589 followerActorContext.setLastApplied(-1);
1591 Follower follower = new Follower(followerActorContext);
1592 followerActor.underlyingActor().setBehavior(follower);
1593 followerActorContext.setCurrentBehavior(follower);
1595 leader = new Leader(leaderActorContext);
1597 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1598 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1599 AppendEntriesReply.class);
1601 MessageCollectorActor.clearMessages(followerActor);
1602 MessageCollectorActor.clearMessages(leaderActor);
1604 // Verify initial AppendEntries sent with the leader's current commit index.
1605 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1606 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1607 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1609 leaderActor.underlyingActor().setBehavior(leader);
1610 leaderActorContext.setCurrentBehavior(leader);
1612 leader.handleMessage(followerActor, appendEntriesReply);
1614 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1615 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1617 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1618 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1619 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1621 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1622 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1623 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1624 appendEntries.getEntries().get(0).getData());
1625 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1626 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1627 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1628 appendEntries.getEntries().get(1).getData());
1630 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1631 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1633 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1635 ApplyState applyState = applyStateList.get(0);
1636 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1637 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1638 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1639 applyState.getReplicatedLogEntry().getData());
1641 applyState = applyStateList.get(1);
1642 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1643 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1644 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1645 applyState.getReplicatedLogEntry().getData());
1647 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1648 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1649 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1653 public void testHandleAppendEntriesReplyWithNewerTerm() {
1654 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1656 MockRaftActorContext leaderActorContext = createActorContext();
1657 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1658 new FiniteDuration(10000, TimeUnit.SECONDS));
1660 leaderActorContext.setReplicatedLog(
1661 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1663 leader = new Leader(leaderActorContext);
1664 leaderActor.underlyingActor().setBehavior(leader);
1665 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1667 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1668 AppendEntriesReply.class);
1670 assertEquals(false, appendEntriesReply.isSuccess());
1671 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1673 MessageCollectorActor.clearMessages(leaderActor);
1677 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1678 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1680 MockRaftActorContext leaderActorContext = createActorContext();
1681 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1682 new FiniteDuration(10000, TimeUnit.SECONDS));
1684 leaderActorContext.setReplicatedLog(
1685 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1686 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1688 leader = new Leader(leaderActorContext);
1689 leaderActor.underlyingActor().setBehavior(leader);
1690 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1692 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1693 AppendEntriesReply.class);
1695 assertEquals(false, appendEntriesReply.isSuccess());
1696 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1698 MessageCollectorActor.clearMessages(leaderActor);
1702 public void testHandleAppendEntriesReplySuccess() {
1703 logStart("testHandleAppendEntriesReplySuccess");
1705 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1707 leaderActorContext.setReplicatedLog(
1708 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1710 leaderActorContext.setCommitIndex(1);
1711 leaderActorContext.setLastApplied(1);
1712 leaderActorContext.getTermInformation().update(1, "leader");
1714 leader = new Leader(leaderActorContext);
1716 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1718 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1719 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1721 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1723 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1725 assertEquals(RaftState.Leader, raftActorBehavior.state());
1727 assertEquals(2, leaderActorContext.getCommitIndex());
1729 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1730 leaderActor, ApplyJournalEntries.class);
1732 assertEquals(2, leaderActorContext.getLastApplied());
1734 assertEquals(2, applyJournalEntries.getToIndex());
1736 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1739 assertEquals(1,applyStateList.size());
1741 ApplyState applyState = applyStateList.get(0);
1743 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1745 assertEquals(2, followerInfo.getMatchIndex());
1746 assertEquals(3, followerInfo.getNextIndex());
1747 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1748 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1752 public void testHandleAppendEntriesReplyUnknownFollower() {
1753 logStart("testHandleAppendEntriesReplyUnknownFollower");
1755 MockRaftActorContext leaderActorContext = createActorContext();
1757 leader = new Leader(leaderActorContext);
1759 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1761 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1763 assertEquals(RaftState.Leader, raftActorBehavior.state());
1767 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1768 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1770 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1771 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1772 new FiniteDuration(1000, TimeUnit.SECONDS));
1773 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1775 leaderActorContext.setReplicatedLog(
1776 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1777 long leaderCommitIndex = 3;
1778 leaderActorContext.setCommitIndex(leaderCommitIndex);
1779 leaderActorContext.setLastApplied(leaderCommitIndex);
1781 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1782 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1783 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1784 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1786 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1788 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1789 followerActorContext.setCommitIndex(-1);
1790 followerActorContext.setLastApplied(-1);
1792 Follower follower = new Follower(followerActorContext);
1793 followerActor.underlyingActor().setBehavior(follower);
1794 followerActorContext.setCurrentBehavior(follower);
1796 leader = new Leader(leaderActorContext);
1798 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1799 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1800 AppendEntriesReply.class);
1802 MessageCollectorActor.clearMessages(followerActor);
1803 MessageCollectorActor.clearMessages(leaderActor);
1805 // Verify initial AppendEntries sent with the leader's current commit index.
1806 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1807 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1808 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1810 leaderActor.underlyingActor().setBehavior(leader);
1811 leaderActorContext.setCurrentBehavior(leader);
1813 leader.handleMessage(followerActor, appendEntriesReply);
1815 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1816 AppendEntries.class, 2);
1817 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1819 appendEntries = appendEntriesList.get(0);
1820 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1821 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1822 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1824 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1825 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1826 appendEntries.getEntries().get(0).getData());
1827 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1828 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1829 appendEntries.getEntries().get(1).getData());
1831 appendEntries = appendEntriesList.get(1);
1832 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1833 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1834 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1836 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1837 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1838 appendEntries.getEntries().get(0).getData());
1839 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1840 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1841 appendEntries.getEntries().get(1).getData());
1843 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1844 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1846 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1848 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1849 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1853 public void testHandleRequestVoteReply() {
1854 logStart("testHandleRequestVoteReply");
1856 MockRaftActorContext leaderActorContext = createActorContext();
1858 leader = new Leader(leaderActorContext);
1860 // Should be a no-op.
1861 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1862 new RequestVoteReply(1, true));
1864 assertEquals(RaftState.Leader, raftActorBehavior.state());
1866 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1868 assertEquals(RaftState.Leader, raftActorBehavior.state());
1872 public void testIsolatedLeaderCheckNoFollowers() {
1873 logStart("testIsolatedLeaderCheckNoFollowers");
1875 MockRaftActorContext leaderActorContext = createActorContext();
1877 leader = new Leader(leaderActorContext);
1878 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1879 assertTrue(newBehavior instanceof Leader);
1883 public void testIsolatedLeaderCheckNoVotingFollowers() {
1884 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1886 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1887 Follower follower = new Follower(followerActorContext);
1888 followerActor.underlyingActor().setBehavior(follower);
1890 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1891 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1892 new FiniteDuration(1000, TimeUnit.SECONDS));
1893 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1895 leader = new Leader(leaderActorContext);
1896 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1897 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1898 assertTrue("Expected Leader", newBehavior instanceof Leader);
1901 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1902 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1903 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1905 MockRaftActorContext leaderActorContext = createActorContext();
1907 Map<String, String> peerAddresses = new HashMap<>();
1908 peerAddresses.put("follower-1", followerActor1.path().toString());
1909 peerAddresses.put("follower-2", followerActor2.path().toString());
1911 leaderActorContext.setPeerAddresses(peerAddresses);
1912 leaderActorContext.setRaftPolicy(raftPolicy);
1914 leader = new Leader(leaderActorContext);
1916 leader.markFollowerActive("follower-1");
1917 leader.markFollowerActive("follower-2");
1918 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1919 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1921 // kill 1 follower and verify if that got killed
1922 final TestKit probe = new TestKit(getSystem());
1923 probe.watch(followerActor1);
1924 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1926 assertEquals(termMsg1.getActor(), followerActor1);
1928 leader.markFollowerInActive("follower-1");
1929 leader.markFollowerActive("follower-2");
1930 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1931 assertTrue("Behavior not instance of Leader when majority of followers are active",
1932 newBehavior instanceof Leader);
1934 // kill 2nd follower and leader should change to Isolated leader
1935 followerActor2.tell(PoisonPill.getInstance(), null);
1936 probe.watch(followerActor2);
1937 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1938 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1939 assertEquals(termMsg2.getActor(), followerActor2);
1941 leader.markFollowerInActive("follower-2");
1942 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1946 public void testIsolatedLeaderCheckTwoFollowers() {
1947 logStart("testIsolatedLeaderCheckTwoFollowers");
1949 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1951 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1952 newBehavior instanceof IsolatedLeader);
1956 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1957 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1959 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1961 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1962 newBehavior instanceof Leader);
1966 public void testLaggingFollowerStarvation() {
1967 logStart("testLaggingFollowerStarvation");
1969 String leaderActorId = actorFactory.generateActorId("leader");
1970 String follower1ActorId = actorFactory.generateActorId("follower");
1971 String follower2ActorId = actorFactory.generateActorId("follower");
1973 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1974 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1976 MockRaftActorContext leaderActorContext =
1977 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1979 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1980 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1981 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1983 leaderActorContext.setConfigParams(configParams);
1985 leaderActorContext.setReplicatedLog(
1986 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1988 Map<String, String> peerAddresses = new HashMap<>();
1989 peerAddresses.put(follower1ActorId,
1990 follower1Actor.path().toString());
1991 peerAddresses.put(follower2ActorId,
1992 follower2Actor.path().toString());
1994 leaderActorContext.setPeerAddresses(peerAddresses);
1995 leaderActorContext.getTermInformation().update(1, leaderActorId);
1997 leader = createBehavior(leaderActorContext);
1999 leaderActor.underlyingActor().setBehavior(leader);
2001 for (int i = 1; i < 6; i++) {
2002 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2003 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2004 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2005 assertTrue(newBehavior == leader);
2006 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2009 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2010 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2012 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2013 heartbeats.size() > 1);
2015 // Check if follower-2 got AppendEntries during this time and was not starved
2016 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2018 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2019 appendEntries.size() > 1);
2023 public void testReplicationConsensusWithNonVotingFollower() {
2024 logStart("testReplicationConsensusWithNonVotingFollower");
2026 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2027 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2028 new FiniteDuration(1000, TimeUnit.SECONDS));
2030 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2031 leaderActorContext.setCommitIndex(-1);
2032 leaderActorContext.setLastApplied(-1);
2034 String nonVotingFollowerId = "nonvoting-follower";
2035 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2036 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2038 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2039 VotingState.NON_VOTING);
2041 leader = new Leader(leaderActorContext);
2042 leaderActorContext.setCurrentBehavior(leader);
2044 // Ignore initial heartbeats
2045 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2046 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2048 MessageCollectorActor.clearMessages(followerActor);
2049 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2050 MessageCollectorActor.clearMessages(leaderActor);
2052 // Send a Replicate message and wait for AppendEntries.
2053 sendReplicate(leaderActorContext, 0);
2055 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2056 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2058 // Send reply only from the voting follower and verify consensus via ApplyState.
2059 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2061 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2063 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2065 MessageCollectorActor.clearMessages(followerActor);
2066 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2067 MessageCollectorActor.clearMessages(leaderActor);
2069 // Send another Replicate message
2070 sendReplicate(leaderActorContext, 1);
2072 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2074 AppendEntries.class);
2075 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2076 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2078 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2079 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2081 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2083 // Send reply from the voting follower and verify consensus.
2084 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2086 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2090 public void testTransferLeadershipWithFollowerInSync() {
2091 logStart("testTransferLeadershipWithFollowerInSync");
2093 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2094 leaderActorContext.setLastApplied(-1);
2095 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2096 new FiniteDuration(1000, TimeUnit.SECONDS));
2097 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2099 leader = new Leader(leaderActorContext);
2100 leaderActorContext.setCurrentBehavior(leader);
2102 // Initial heartbeat
2103 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2104 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2105 MessageCollectorActor.clearMessages(followerActor);
2107 sendReplicate(leaderActorContext, 0);
2108 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2111 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2112 MessageCollectorActor.clearMessages(followerActor);
2114 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2115 leader.transferLeadership(mockTransferCohort);
2117 verify(mockTransferCohort, never()).transferComplete();
2118 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2119 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2120 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2122 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2123 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2125 // Leader should force an election timeout
2126 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2128 verify(mockTransferCohort).transferComplete();
2132 public void testTransferLeadershipWithEmptyLog() {
2133 logStart("testTransferLeadershipWithEmptyLog");
2135 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2136 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2137 new FiniteDuration(1000, TimeUnit.SECONDS));
2138 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2140 leader = new Leader(leaderActorContext);
2141 leaderActorContext.setCurrentBehavior(leader);
2143 // Initial heartbeat
2144 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2145 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2146 MessageCollectorActor.clearMessages(followerActor);
2148 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2149 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2150 leader.transferLeadership(mockTransferCohort);
2152 verify(mockTransferCohort, never()).transferComplete();
2153 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2156 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2157 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2159 // Leader should force an election timeout
2160 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2162 verify(mockTransferCohort).transferComplete();
2166 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2167 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2169 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2170 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2171 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2173 leader = new Leader(leaderActorContext);
2174 leaderActorContext.setCurrentBehavior(leader);
2176 // Initial heartbeat
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178 MessageCollectorActor.clearMessages(followerActor);
2180 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2181 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2182 leader.transferLeadership(mockTransferCohort);
2184 verify(mockTransferCohort, never()).transferComplete();
2186 // Sync up the follower.
2187 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2188 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2189 MessageCollectorActor.clearMessages(followerActor);
2191 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2192 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2193 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2194 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2195 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2197 // Leader should force an election timeout
2198 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2200 verify(mockTransferCohort).transferComplete();
2204 public void testTransferLeadershipWithFollowerSyncTimeout() {
2205 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2207 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2208 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2209 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2210 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2211 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2213 leader = new Leader(leaderActorContext);
2214 leaderActorContext.setCurrentBehavior(leader);
2216 // Initial heartbeat
2217 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2218 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2219 MessageCollectorActor.clearMessages(followerActor);
2221 sendReplicate(leaderActorContext, 0);
2222 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2224 MessageCollectorActor.clearMessages(followerActor);
2226 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2227 leader.transferLeadership(mockTransferCohort);
2229 verify(mockTransferCohort, never()).transferComplete();
2231 // Send heartbeats to time out the transfer.
2232 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2233 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2234 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2235 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2238 verify(mockTransferCohort).abortTransfer();
2239 verify(mockTransferCohort, never()).transferComplete();
2240 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2244 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2245 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2247 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2248 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2249 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2250 final MockRaftActorContext.MockPayload largePayload =
2251 new MockRaftActorContext.MockPayload("large", serializedSize);
2253 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2254 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2255 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2256 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2257 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2258 leaderActorContext.setCommitIndex(-1);
2259 leaderActorContext.setLastApplied(-1);
2261 leader = new Leader(leaderActorContext);
2262 leaderActorContext.setCurrentBehavior(leader);
2264 // Send initial heartbeat reply so follower is marked active
2265 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2266 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2267 MessageCollectorActor.clearMessages(followerActor);
2269 // Send normal payload first to prime commit index.
2270 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2271 sendReplicate(leaderActorContext, term, 0);
2273 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2274 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2275 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2277 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2278 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2279 MessageCollectorActor.clearMessages(followerActor);
2281 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2282 sendReplicate(leaderActorContext, term, 1, largePayload);
2284 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2285 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2286 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2288 final Identifier slicingId = messageSlice.getIdentifier();
2290 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2291 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2292 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2293 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2294 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2295 MessageCollectorActor.clearMessages(followerActor);
2297 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2299 // Sleep for the heartbeat interval so AppendEntries is sent.
2300 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2301 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2303 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2305 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2306 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2307 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2308 MessageCollectorActor.clearMessages(followerActor);
2310 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2312 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2313 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2314 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2316 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2318 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2320 MessageCollectorActor.clearMessages(followerActor);
2322 // Send another normal payload.
2324 sendReplicate(leaderActorContext, term, 2);
2326 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2327 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2328 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2329 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2333 public void testLargePayloadSlicingExpiration() {
2334 logStart("testLargePayloadSlicingExpiration");
2336 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2337 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2338 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2339 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2340 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2341 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2342 leaderActorContext.setCommitIndex(-1);
2343 leaderActorContext.setLastApplied(-1);
2345 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2346 leader = new Leader(leaderActorContext);
2347 leaderActorContext.setCurrentBehavior(leader);
2349 // Send initial heartbeat reply so follower is marked active
2350 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2352 MessageCollectorActor.clearMessages(followerActor);
2354 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2355 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2356 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2358 // Sleep for at least 3 * election timeout so the slicing state expires.
2359 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2360 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2361 MessageCollectorActor.clearMessages(followerActor);
2363 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2365 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2366 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2367 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2369 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2370 MessageCollectorActor.clearMessages(followerActor);
2372 // Send an AppendEntriesReply - this should restart the slicing.
2374 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2375 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2377 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2379 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2383 public void testLeaderAddressInAppendEntries() {
2384 logStart("testLeaderAddressInAppendEntries");
2386 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2387 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2388 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2389 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2390 leaderActorContext.setCommitIndex(-1);
2391 leaderActorContext.setLastApplied(-1);
2393 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2394 peerId -> leaderActor.path().toString());
2396 leader = new Leader(leaderActorContext);
2397 leaderActorContext.setCurrentBehavior(leader);
2399 // Initial heartbeat shouldn't have the leader address
2401 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2402 assertFalse(appendEntries.getLeaderAddress().isPresent());
2403 MessageCollectorActor.clearMessages(followerActor);
2405 // Send AppendEntriesReply indicating the follower needs the leader address
2407 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2408 RaftVersions.CURRENT_VERSION));
2410 // Sleep for the heartbeat interval so AppendEntries is sent.
2411 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2412 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2414 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2416 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2417 assertTrue(appendEntries.getLeaderAddress().isPresent());
2418 assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2419 MessageCollectorActor.clearMessages(followerActor);
2421 // Send AppendEntriesReply indicating the follower does not need the leader address
2423 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2424 RaftVersions.CURRENT_VERSION));
2426 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2427 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2429 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2431 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2432 assertFalse(appendEntries.getLeaderAddress().isPresent());
2436 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2437 final ActorRef actorRef, final RaftRPC rpc) {
2438 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2439 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2442 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2444 private final long electionTimeOutIntervalMillis;
2445 private final int snapshotChunkSize;
2447 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2448 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2449 this.snapshotChunkSize = snapshotChunkSize;
2453 public FiniteDuration getElectionTimeOutInterval() {
2454 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2458 public int getSnapshotChunkSize() {
2459 return snapshotChunkSize;