2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.protobuf.ByteString;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.io.ByteSource;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
75 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
84 static final String FOLLOWER_ID = "follower";
85 public static final String LEADER_ID = "leader";
87 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
93 private Leader leader;
94 private final short payloadVersion = 5;
98 public void tearDown() {
107 public void testHandleMessageForUnknownMessage() {
108 logStart("testHandleMessageForUnknownMessage");
110 leader = new Leader(createActorContext());
112 // handle message should null when it receives an unknown message
113 assertNull(leader.handleMessage(followerActor, "foo"));
117 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
120 MockRaftActorContext actorContext = createActorContextWithFollower();
121 actorContext.setCommitIndex(-1);
122 actorContext.setPayloadVersion(payloadVersion);
125 actorContext.getTermInformation().update(term, "");
127 leader = new Leader(actorContext);
128 actorContext.setCurrentBehavior(leader);
130 // Leader should send an immediate heartbeat with no entries as follower is inactive.
131 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133 assertEquals("getTerm", term, appendEntries.getTerm());
134 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136 assertEquals("Entries size", 0, appendEntries.getEntries().size());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139 // The follower would normally reply - simulate that explicitly here.
140 leader.handleMessage(followerActor, new AppendEntriesReply(
141 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
144 followerActor.underlyingActor().clear();
146 // Sleep for the heartbeat interval so AppendEntries is sent.
147 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
150 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
152 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155 assertEquals("Entries size", 1, appendEntries.getEntries().size());
156 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
162 private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163 return sendReplicate(actorContext, 1, index);
166 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
167 return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
170 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
171 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172 actorContext.getReplicatedLog().append(newEntry);
173 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
177 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
180 MockRaftActorContext actorContext = createActorContextWithFollower();
183 actorContext.getTermInformation().update(term, "");
185 leader = new Leader(actorContext);
187 // Leader will send an immediate heartbeat - ignore it.
188 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 // The follower would normally reply - simulate that explicitly here.
191 long lastIndex = actorContext.getReplicatedLog().lastIndex();
192 leader.handleMessage(followerActor, new AppendEntriesReply(
193 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196 followerActor.underlyingActor().clear();
198 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
200 // State should not change
201 assertTrue(raftBehavior instanceof Leader);
203 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206 assertEquals("Entries size", 1, appendEntries.getEntries().size());
207 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
214 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
217 MockRaftActorContext actorContext = createActorContextWithFollower();
218 actorContext.setCommitIndex(-1);
219 actorContext.setLastApplied(-1);
221 // The raft context is initialized with a couple log entries. However the commitIndex
222 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223 // committed and applied. Now it regains leadership with a higher term (2).
224 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225 long newTerm = prevTerm + 1;
226 actorContext.getTermInformation().update(newTerm, "");
228 leader = new Leader(actorContext);
229 actorContext.setCurrentBehavior(leader);
231 // Leader will send an immediate heartbeat - ignore it.
232 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 // The follower replies with the leader's current last index and term, simulating that it is
235 // up to date with the leader.
236 long lastIndex = actorContext.getReplicatedLog().lastIndex();
237 leader.handleMessage(followerActor, new AppendEntriesReply(
238 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
240 // The commit index should not get updated even though consensus was reached. This is b/c the
241 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
242 // from previous terms by counting replicas".
243 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
245 followerActor.underlyingActor().clear();
247 // Now replicate a new entry with the new term 2.
248 long newIndex = lastIndex + 1;
249 sendReplicate(actorContext, newTerm, newIndex);
251 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254 assertEquals("Entries size", 1, appendEntries.getEntries().size());
255 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
259 // The follower replies with success. The leader should now update the commit index to the new index
260 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
261 // prior entries are committed indirectly".
262 leader.handleMessage(followerActor, new AppendEntriesReply(
263 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
265 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
269 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
272 MockRaftActorContext actorContext = createActorContextWithFollower();
273 actorContext.setRaftPolicy(createRaftPolicy(true, true));
276 actorContext.getTermInformation().update(term, "");
278 leader = new Leader(actorContext);
280 // Leader will send an immediate heartbeat - ignore it.
281 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 // The follower would normally reply - simulate that explicitly here.
284 long lastIndex = actorContext.getReplicatedLog().lastIndex();
285 leader.handleMessage(followerActor, new AppendEntriesReply(
286 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289 followerActor.underlyingActor().clear();
291 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
293 // State should not change
294 assertTrue(raftBehavior instanceof Leader);
296 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299 assertEquals("Entries size", 1, appendEntries.getEntries().size());
300 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
307 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
310 MockRaftActorContext actorContext = createActorContextWithFollower();
311 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313 public FiniteDuration getHeartBeatInterval() {
314 return FiniteDuration.apply(5, TimeUnit.SECONDS);
319 actorContext.getTermInformation().update(term, "");
321 leader = new Leader(actorContext);
323 // Leader will send an immediate heartbeat - ignore it.
324 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326 // The follower would normally reply - simulate that explicitly here.
327 long lastIndex = actorContext.getReplicatedLog().lastIndex();
328 leader.handleMessage(followerActor, new AppendEntriesReply(
329 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332 followerActor.underlyingActor().clear();
334 for (int i = 0; i < 5; i++) {
335 sendReplicate(actorContext, lastIndex + i + 1);
338 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339 // We expect only 1 message to be sent because of two reasons,
340 // - an append entries reply was not received
341 // - the heartbeat interval has not expired
342 // In this scenario if multiple messages are sent they would likely be duplicates
343 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
347 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
350 MockRaftActorContext actorContext = createActorContextWithFollower();
351 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
353 public FiniteDuration getHeartBeatInterval() {
354 return FiniteDuration.apply(5, TimeUnit.SECONDS);
359 actorContext.getTermInformation().update(term, "");
361 leader = new Leader(actorContext);
363 // Leader will send an immediate heartbeat - ignore it.
364 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
366 // The follower would normally reply - simulate that explicitly here.
367 long lastIndex = actorContext.getReplicatedLog().lastIndex();
368 leader.handleMessage(followerActor, new AppendEntriesReply(
369 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
372 followerActor.underlyingActor().clear();
374 for (int i = 0; i < 3; i++) {
375 sendReplicate(actorContext, lastIndex + i + 1);
376 leader.handleMessage(followerActor, new AppendEntriesReply(
377 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
381 for (int i = 3; i < 5; i++) {
382 sendReplicate(actorContext, lastIndex + i + 1);
385 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
386 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
387 // get sent to the follower - but not the 5th
388 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
390 for (int i = 0; i < 4; i++) {
391 long expected = allMessages.get(i).getEntries().get(0).getIndex();
392 assertEquals(expected, i + 2);
397 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
398 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
400 MockRaftActorContext actorContext = createActorContextWithFollower();
401 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
403 public FiniteDuration getHeartBeatInterval() {
404 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
409 actorContext.getTermInformation().update(term, "");
411 leader = new Leader(actorContext);
413 // Leader will send an immediate heartbeat - ignore it.
414 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
416 // The follower would normally reply - simulate that explicitly here.
417 long lastIndex = actorContext.getReplicatedLog().lastIndex();
418 leader.handleMessage(followerActor, new AppendEntriesReply(
419 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
422 followerActor.underlyingActor().clear();
424 sendReplicate(actorContext, lastIndex + 1);
426 // Wait slightly longer than heartbeat duration
427 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
429 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
431 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
432 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
434 assertEquals(1, allMessages.get(0).getEntries().size());
435 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
436 assertEquals(1, allMessages.get(1).getEntries().size());
437 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
442 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
443 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
445 MockRaftActorContext actorContext = createActorContextWithFollower();
446 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
448 public FiniteDuration getHeartBeatInterval() {
449 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
454 actorContext.getTermInformation().update(term, "");
456 leader = new Leader(actorContext);
458 // Leader will send an immediate heartbeat - ignore it.
459 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
461 // The follower would normally reply - simulate that explicitly here.
462 long lastIndex = actorContext.getReplicatedLog().lastIndex();
463 leader.handleMessage(followerActor, new AppendEntriesReply(
464 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
465 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
467 followerActor.underlyingActor().clear();
469 for (int i = 0; i < 3; i++) {
470 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
471 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
474 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
475 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
479 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
480 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
482 MockRaftActorContext actorContext = createActorContextWithFollower();
483 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
485 public FiniteDuration getHeartBeatInterval() {
486 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
491 actorContext.getTermInformation().update(term, "");
493 leader = new Leader(actorContext);
495 // Leader will send an immediate heartbeat - ignore it.
496 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
498 // The follower would normally reply - simulate that explicitly here.
499 long lastIndex = actorContext.getReplicatedLog().lastIndex();
500 leader.handleMessage(followerActor, new AppendEntriesReply(
501 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
502 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
504 followerActor.underlyingActor().clear();
506 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
507 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
508 sendReplicate(actorContext, lastIndex + 1);
510 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
511 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
513 assertEquals(0, allMessages.get(0).getEntries().size());
514 assertEquals(1, allMessages.get(1).getEntries().size());
519 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
520 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
522 MockRaftActorContext actorContext = createActorContext();
524 leader = new Leader(actorContext);
526 actorContext.setLastApplied(0);
528 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
529 long term = actorContext.getTermInformation().getCurrentTerm();
530 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
531 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
533 actorContext.getReplicatedLog().append(newEntry);
535 final Identifier id = new MockIdentifier("state-id");
536 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
537 new Replicate(leaderActor, id, newEntry, true));
539 // State should not change
540 assertTrue(raftBehavior instanceof Leader);
542 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
544 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
545 // one since lastApplied state is 0.
546 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
547 leaderActor, ApplyState.class);
548 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
550 for (int i = 0; i <= newLogIndex - 1; i++) {
551 ApplyState applyState = applyStateList.get(i);
552 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
553 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
556 ApplyState last = applyStateList.get((int) newLogIndex - 1);
557 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
558 assertEquals("getIdentifier", id, last.getIdentifier());
562 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
563 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
565 final MockRaftActorContext actorContext = createActorContextWithFollower();
567 Map<String, String> leadersSnapshot = new HashMap<>();
568 leadersSnapshot.put("1", "A");
569 leadersSnapshot.put("2", "B");
570 leadersSnapshot.put("3", "C");
573 actorContext.getReplicatedLog().removeFrom(0);
575 final int commitIndex = 3;
576 final int snapshotIndex = 2;
577 final int snapshotTerm = 1;
579 // set the snapshot variables in replicatedlog
580 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
581 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
582 actorContext.setCommitIndex(commitIndex);
583 //set follower timeout to 2 mins, helps during debugging
584 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
586 leader = new Leader(actorContext);
588 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
589 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
591 //update follower timestamp
592 leader.markFollowerActive(FOLLOWER_ID);
594 ByteString bs = toByteString(leadersSnapshot);
595 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
596 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
597 -1, null, null), ByteSource.wrap(bs.toByteArray())));
598 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
599 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
600 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
601 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
603 //send first chunk and no InstallSnapshotReply received yet
605 fts.incrementChunkIndex();
607 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608 TimeUnit.MILLISECONDS);
610 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
612 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
614 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
616 //InstallSnapshotReply received
617 fts.markSendStatus(true);
619 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
621 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
623 assertEquals(commitIndex, is.getLastIncludedIndex());
627 public void testSendAppendEntriesSnapshotScenario() {
628 logStart("testSendAppendEntriesSnapshotScenario");
630 final MockRaftActorContext actorContext = createActorContextWithFollower();
632 Map<String, String> leadersSnapshot = new HashMap<>();
633 leadersSnapshot.put("1", "A");
634 leadersSnapshot.put("2", "B");
635 leadersSnapshot.put("3", "C");
638 actorContext.getReplicatedLog().removeFrom(0);
640 final int followersLastIndex = 2;
641 final int snapshotIndex = 3;
642 final int newEntryIndex = 4;
643 final int snapshotTerm = 1;
644 final int currentTerm = 2;
646 // set the snapshot variables in replicatedlog
647 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649 actorContext.setCommitIndex(followersLastIndex);
651 leader = new Leader(actorContext);
653 // Leader will send an immediate heartbeat - ignore it.
654 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
657 SimpleReplicatedLogEntry entry =
658 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
659 new MockRaftActorContext.MockPayload("D"));
661 actorContext.getReplicatedLog().append(entry);
663 //update follower timestamp
664 leader.markFollowerActive(FOLLOWER_ID);
666 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
667 RaftActorBehavior raftBehavior = leader.handleMessage(
668 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
670 assertTrue(raftBehavior instanceof Leader);
672 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
676 public void testInitiateInstallSnapshot() {
677 logStart("testInitiateInstallSnapshot");
679 MockRaftActorContext actorContext = createActorContextWithFollower();
682 actorContext.getReplicatedLog().removeFrom(0);
684 final int followersLastIndex = 2;
685 final int snapshotIndex = 3;
686 final int newEntryIndex = 4;
687 final int snapshotTerm = 1;
688 final int currentTerm = 2;
690 // set the snapshot variables in replicatedlog
691 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693 actorContext.setLastApplied(3);
694 actorContext.setCommitIndex(followersLastIndex);
696 leader = new Leader(actorContext);
698 // Leader will send an immediate heartbeat - ignore it.
699 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
701 // set the snapshot as absent and check if capture-snapshot is invoked.
702 leader.setSnapshotHolder(null);
705 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
706 new MockRaftActorContext.MockPayload("D"));
708 actorContext.getReplicatedLog().append(entry);
710 //update follower timestamp
711 leader.markFollowerActive(FOLLOWER_ID);
713 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
715 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
717 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
719 assertEquals(3, cs.getLastAppliedIndex());
720 assertEquals(1, cs.getLastAppliedTerm());
721 assertEquals(4, cs.getLastIndex());
722 assertEquals(2, cs.getLastTerm());
724 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
725 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
727 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
731 public void testInitiateForceInstallSnapshot() throws Exception {
732 logStart("testInitiateForceInstallSnapshot");
734 MockRaftActorContext actorContext = createActorContextWithFollower();
736 final int followersLastIndex = 2;
737 final int snapshotIndex = -1;
738 final int newEntryIndex = 4;
739 final int snapshotTerm = -1;
740 final int currentTerm = 2;
742 // set the snapshot variables in replicatedlog
743 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745 actorContext.setLastApplied(3);
746 actorContext.setCommitIndex(followersLastIndex);
748 actorContext.getReplicatedLog().removeFrom(0);
750 AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
751 actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
753 leader = new Leader(actorContext);
754 actorContext.setCurrentBehavior(leader);
756 // Leader will send an immediate heartbeat - ignore it.
757 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
759 // set the snapshot as absent and check if capture-snapshot is invoked.
760 leader.setSnapshotHolder(null);
762 for (int i = 0; i < 4; i++) {
763 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
764 new MockRaftActorContext.MockPayload("X" + i)));
768 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
769 new MockRaftActorContext.MockPayload("D"));
771 actorContext.getReplicatedLog().append(entry);
773 //update follower timestamp
774 leader.markFollowerActive(FOLLOWER_ID);
776 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
777 // installed with a SendInstallSnapshot
778 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
780 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
782 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
783 assertEquals(3, cs.getLastAppliedIndex());
784 assertEquals(1, cs.getLastAppliedTerm());
785 assertEquals(4, cs.getLastIndex());
786 assertEquals(2, cs.getLastTerm());
788 assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
789 assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
791 MessageCollectorActor.clearMessages(followerActor);
793 // Sending Replicate message should not initiate another capture since the first is in progress.
794 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
795 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
797 // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
798 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
799 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
801 // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
802 final byte[] bytes = new byte[]{1, 2, 3};
803 installSnapshotStream.get().get().write(bytes);
804 actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
805 Runtime.getRuntime().totalMemory());
806 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
808 // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
809 MessageCollectorActor.clearMessages(followerActor);
810 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
811 MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
816 public void testInstallSnapshot() {
817 logStart("testInstallSnapshot");
819 final MockRaftActorContext actorContext = createActorContextWithFollower();
821 Map<String, String> leadersSnapshot = new HashMap<>();
822 leadersSnapshot.put("1", "A");
823 leadersSnapshot.put("2", "B");
824 leadersSnapshot.put("3", "C");
827 actorContext.getReplicatedLog().removeFrom(0);
829 final int lastAppliedIndex = 3;
830 final int snapshotIndex = 2;
831 final int snapshotTerm = 1;
832 final int currentTerm = 2;
834 // set the snapshot variables in replicatedlog
835 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
836 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
837 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
838 actorContext.setCommitIndex(lastAppliedIndex);
839 actorContext.setLastApplied(lastAppliedIndex);
841 leader = new Leader(actorContext);
843 // Initial heartbeat.
844 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
846 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
847 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
849 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
850 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
851 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
853 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
854 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
856 assertTrue(raftBehavior instanceof Leader);
858 // check if installsnapshot gets called with the correct values.
860 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
861 InstallSnapshot.class);
863 assertNotNull(installSnapshot.getData());
864 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
865 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
867 assertEquals(currentTerm, installSnapshot.getTerm());
871 public void testForceInstallSnapshot() {
872 logStart("testForceInstallSnapshot");
874 final MockRaftActorContext actorContext = createActorContextWithFollower();
876 Map<String, String> leadersSnapshot = new HashMap<>();
877 leadersSnapshot.put("1", "A");
878 leadersSnapshot.put("2", "B");
879 leadersSnapshot.put("3", "C");
881 final int lastAppliedIndex = 3;
882 final int snapshotIndex = -1;
883 final int snapshotTerm = -1;
884 final int currentTerm = 2;
886 // set the snapshot variables in replicatedlog
887 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
888 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
889 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
890 actorContext.setCommitIndex(lastAppliedIndex);
891 actorContext.setLastApplied(lastAppliedIndex);
893 leader = new Leader(actorContext);
895 // Initial heartbeat.
896 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
898 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
899 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
901 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
902 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
903 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
905 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
906 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
908 assertTrue(raftBehavior instanceof Leader);
910 // check if installsnapshot gets called with the correct values.
912 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
913 InstallSnapshot.class);
915 assertNotNull(installSnapshot.getData());
916 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
917 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
919 assertEquals(currentTerm, installSnapshot.getTerm());
923 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
924 logStart("testHandleInstallSnapshotReplyLastChunk");
926 MockRaftActorContext actorContext = createActorContextWithFollower();
928 final int commitIndex = 3;
929 final int snapshotIndex = 2;
930 final int snapshotTerm = 1;
931 final int currentTerm = 2;
933 actorContext.setCommitIndex(commitIndex);
935 leader = new Leader(actorContext);
936 actorContext.setCurrentBehavior(leader);
938 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
939 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
941 // Ignore initial heartbeat.
942 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
944 Map<String, String> leadersSnapshot = new HashMap<>();
945 leadersSnapshot.put("1", "A");
946 leadersSnapshot.put("2", "B");
947 leadersSnapshot.put("3", "C");
949 // set the snapshot variables in replicatedlog
951 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
952 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
953 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
955 ByteString bs = toByteString(leadersSnapshot);
956 leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
957 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
958 -1, null, null), ByteSource.wrap(bs.toByteArray())));
959 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
960 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
961 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
962 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
963 while (!fts.isLastChunk(fts.getChunkIndex())) {
965 fts.incrementChunkIndex();
969 actorContext.getReplicatedLog().removeFrom(0);
971 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
972 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
974 assertTrue(raftBehavior instanceof Leader);
976 assertEquals(1, leader.followerLogSize());
977 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
979 assertNull(fli.getInstallSnapshotState());
980 assertEquals(commitIndex, fli.getMatchIndex());
981 assertEquals(commitIndex + 1, fli.getNextIndex());
982 assertFalse(leader.hasSnapshot());
986 public void testSendSnapshotfromInstallSnapshotReply() {
987 logStart("testSendSnapshotfromInstallSnapshotReply");
989 MockRaftActorContext actorContext = createActorContextWithFollower();
991 final int commitIndex = 3;
992 final int snapshotIndex = 2;
993 final int snapshotTerm = 1;
994 final int currentTerm = 2;
996 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
998 public int getSnapshotChunkSize() {
1002 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1003 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1005 actorContext.setConfigParams(configParams);
1006 actorContext.setCommitIndex(commitIndex);
1008 leader = new Leader(actorContext);
1009 actorContext.setCurrentBehavior(leader);
1011 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1012 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1014 Map<String, String> leadersSnapshot = new HashMap<>();
1015 leadersSnapshot.put("1", "A");
1016 leadersSnapshot.put("2", "B");
1017 leadersSnapshot.put("3", "C");
1019 // set the snapshot variables in replicatedlog
1020 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1021 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1022 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1024 ByteString bs = toByteString(leadersSnapshot);
1025 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1026 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1029 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1031 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1032 InstallSnapshot.class);
1034 assertEquals(1, installSnapshot.getChunkIndex());
1035 assertEquals(3, installSnapshot.getTotalChunks());
1037 followerActor.underlyingActor().clear();
1038 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1039 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1041 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1043 assertEquals(2, installSnapshot.getChunkIndex());
1044 assertEquals(3, installSnapshot.getTotalChunks());
1046 followerActor.underlyingActor().clear();
1047 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1048 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1050 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1052 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1053 followerActor.underlyingActor().clear();
1054 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1055 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1057 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1059 assertNull(installSnapshot);
1064 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1065 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1067 MockRaftActorContext actorContext = createActorContextWithFollower();
1069 final int commitIndex = 3;
1070 final int snapshotIndex = 2;
1071 final int snapshotTerm = 1;
1072 final int currentTerm = 2;
1074 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1076 public int getSnapshotChunkSize() {
1081 actorContext.setCommitIndex(commitIndex);
1083 leader = new Leader(actorContext);
1085 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1086 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1088 Map<String, String> leadersSnapshot = new HashMap<>();
1089 leadersSnapshot.put("1", "A");
1090 leadersSnapshot.put("2", "B");
1091 leadersSnapshot.put("3", "C");
1093 // set the snapshot variables in replicatedlog
1094 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1095 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1096 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1098 ByteString bs = toByteString(leadersSnapshot);
1099 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1100 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1103 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1104 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1106 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1107 InstallSnapshot.class);
1109 assertEquals(1, installSnapshot.getChunkIndex());
1110 assertEquals(3, installSnapshot.getTotalChunks());
1112 followerActor.underlyingActor().clear();
1114 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1115 FOLLOWER_ID, -1, false));
1117 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1118 TimeUnit.MILLISECONDS);
1120 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1122 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1124 assertEquals(1, installSnapshot.getChunkIndex());
1125 assertEquals(3, installSnapshot.getTotalChunks());
1129 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1130 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1132 MockRaftActorContext actorContext = createActorContextWithFollower();
1134 final int commitIndex = 3;
1135 final int snapshotIndex = 2;
1136 final int snapshotTerm = 1;
1137 final int currentTerm = 2;
1139 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1141 public int getSnapshotChunkSize() {
1146 actorContext.setCommitIndex(commitIndex);
1148 leader = new Leader(actorContext);
1150 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1151 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1153 Map<String, String> leadersSnapshot = new HashMap<>();
1154 leadersSnapshot.put("1", "A");
1155 leadersSnapshot.put("2", "B");
1156 leadersSnapshot.put("3", "C");
1158 // set the snapshot variables in replicatedlog
1159 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1160 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1161 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1163 ByteString bs = toByteString(leadersSnapshot);
1164 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1165 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1168 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1170 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1171 InstallSnapshot.class);
1173 assertEquals(1, installSnapshot.getChunkIndex());
1174 assertEquals(3, installSnapshot.getTotalChunks());
1175 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1176 installSnapshot.getLastChunkHashCode().get().intValue());
1178 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1180 followerActor.underlyingActor().clear();
1182 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1183 FOLLOWER_ID, 1, true));
1185 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1187 assertEquals(2, installSnapshot.getChunkIndex());
1188 assertEquals(3, installSnapshot.getTotalChunks());
1189 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1193 public void testLeaderInstallSnapshotState() throws IOException {
1194 logStart("testLeaderInstallSnapshotState");
1196 Map<String, String> leadersSnapshot = new HashMap<>();
1197 leadersSnapshot.put("1", "A");
1198 leadersSnapshot.put("2", "B");
1199 leadersSnapshot.put("3", "C");
1201 ByteString bs = toByteString(leadersSnapshot);
1202 byte[] barray = bs.toByteArray();
1204 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1205 fts.setSnapshotBytes(ByteSource.wrap(barray));
1207 assertEquals(bs.size(), barray.length);
1210 for (int i = 0; i < barray.length; i = i + 50) {
1211 int length = i + 50;
1214 if (i + 50 > barray.length) {
1215 length = barray.length;
1218 byte[] chunk = fts.getNextChunk();
1219 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1220 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1222 fts.markSendStatus(true);
1223 if (!fts.isLastChunk(chunkIndex)) {
1224 fts.incrementChunkIndex();
1228 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1233 protected Leader createBehavior(final RaftActorContext actorContext) {
1234 return new Leader(actorContext);
1238 protected MockRaftActorContext createActorContext() {
1239 return createActorContext(leaderActor);
1243 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1244 return createActorContext(LEADER_ID, actorRef);
1247 private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1248 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1249 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1250 configParams.setElectionTimeoutFactor(100000);
1251 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1252 context.setConfigParams(configParams);
1253 context.setPayloadVersion(payloadVersion);
1257 private MockRaftActorContext createActorContextWithFollower() {
1258 MockRaftActorContext actorContext = createActorContext();
1259 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1260 followerActor.path().toString()).build());
1261 return actorContext;
1264 private MockRaftActorContext createFollowerActorContextWithLeader() {
1265 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1266 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1267 followerConfig.setElectionTimeoutFactor(10000);
1268 followerActorContext.setConfigParams(followerConfig);
1269 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1270 return followerActorContext;
1274 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1275 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1277 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1279 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1281 Follower follower = new Follower(followerActorContext);
1282 followerActor.underlyingActor().setBehavior(follower);
1283 followerActorContext.setCurrentBehavior(follower);
1285 Map<String, String> peerAddresses = new HashMap<>();
1286 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1288 leaderActorContext.setPeerAddresses(peerAddresses);
1290 leaderActorContext.getReplicatedLog().removeFrom(0);
1293 leaderActorContext.setReplicatedLog(
1294 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1296 leaderActorContext.setCommitIndex(1);
1298 followerActorContext.getReplicatedLog().removeFrom(0);
1300 // follower too has the exact same log entries and has the same commit index
1301 followerActorContext.setReplicatedLog(
1302 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1304 followerActorContext.setCommitIndex(1);
1306 leader = new Leader(leaderActorContext);
1307 leaderActorContext.setCurrentBehavior(leader);
1309 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1311 assertEquals(-1, appendEntries.getLeaderCommit());
1312 assertEquals(0, appendEntries.getEntries().size());
1313 assertEquals(0, appendEntries.getPrevLogIndex());
1315 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1316 leaderActor, AppendEntriesReply.class);
1318 assertEquals(2, appendEntriesReply.getLogLastIndex());
1319 assertEquals(1, appendEntriesReply.getLogLastTerm());
1321 // follower returns its next index
1322 assertEquals(2, appendEntriesReply.getLogLastIndex());
1323 assertEquals(1, appendEntriesReply.getLogLastTerm());
1329 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1330 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1332 final MockRaftActorContext leaderActorContext = createActorContext();
1334 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1335 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1337 Follower follower = new Follower(followerActorContext);
1338 followerActor.underlyingActor().setBehavior(follower);
1339 followerActorContext.setCurrentBehavior(follower);
1341 Map<String, String> leaderPeerAddresses = new HashMap<>();
1342 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1344 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1346 leaderActorContext.getReplicatedLog().removeFrom(0);
1348 leaderActorContext.setReplicatedLog(
1349 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1351 leaderActorContext.setCommitIndex(1);
1353 followerActorContext.getReplicatedLog().removeFrom(0);
1355 followerActorContext.setReplicatedLog(
1356 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1358 // follower has the same log entries but its commit index > leaders commit index
1359 followerActorContext.setCommitIndex(2);
1361 leader = new Leader(leaderActorContext);
1363 // Initial heartbeat
1364 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1366 assertEquals(-1, appendEntries.getLeaderCommit());
1367 assertEquals(0, appendEntries.getEntries().size());
1368 assertEquals(0, appendEntries.getPrevLogIndex());
1370 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1371 leaderActor, AppendEntriesReply.class);
1373 assertEquals(2, appendEntriesReply.getLogLastIndex());
1374 assertEquals(1, appendEntriesReply.getLogLastTerm());
1376 leaderActor.underlyingActor().setBehavior(follower);
1377 leader.handleMessage(followerActor, appendEntriesReply);
1379 leaderActor.underlyingActor().clear();
1380 followerActor.underlyingActor().clear();
1382 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1383 TimeUnit.MILLISECONDS);
1385 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1387 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1389 assertEquals(2, appendEntries.getLeaderCommit());
1390 assertEquals(0, appendEntries.getEntries().size());
1391 assertEquals(2, appendEntries.getPrevLogIndex());
1393 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1395 assertEquals(2, appendEntriesReply.getLogLastIndex());
1396 assertEquals(1, appendEntriesReply.getLogLastTerm());
1398 assertEquals(2, followerActorContext.getCommitIndex());
1404 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1405 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1407 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1408 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1409 new FiniteDuration(1000, TimeUnit.SECONDS));
1411 leaderActorContext.setReplicatedLog(
1412 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1413 long leaderCommitIndex = 2;
1414 leaderActorContext.setCommitIndex(leaderCommitIndex);
1415 leaderActorContext.setLastApplied(leaderCommitIndex);
1417 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1418 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1420 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1422 followerActorContext.setReplicatedLog(
1423 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1424 followerActorContext.setCommitIndex(0);
1425 followerActorContext.setLastApplied(0);
1427 Follower follower = new Follower(followerActorContext);
1428 followerActor.underlyingActor().setBehavior(follower);
1430 leader = new Leader(leaderActorContext);
1432 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1433 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1434 AppendEntriesReply.class);
1436 MessageCollectorActor.clearMessages(followerActor);
1437 MessageCollectorActor.clearMessages(leaderActor);
1439 // Verify initial AppendEntries sent.
1440 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1441 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1442 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1444 leaderActor.underlyingActor().setBehavior(leader);
1446 leader.handleMessage(followerActor, appendEntriesReply);
1448 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1449 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1451 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1452 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1453 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1455 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1456 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1457 appendEntries.getEntries().get(0).getData());
1458 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1459 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1460 appendEntries.getEntries().get(1).getData());
1462 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1463 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1465 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1467 ApplyState applyState = applyStateList.get(0);
1468 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1469 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1470 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1471 applyState.getReplicatedLogEntry().getData());
1473 applyState = applyStateList.get(1);
1474 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1475 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1476 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1477 applyState.getReplicatedLogEntry().getData());
1479 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1480 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1484 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1485 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1487 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1488 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1489 new FiniteDuration(1000, TimeUnit.SECONDS));
1491 leaderActorContext.setReplicatedLog(
1492 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1493 long leaderCommitIndex = 1;
1494 leaderActorContext.setCommitIndex(leaderCommitIndex);
1495 leaderActorContext.setLastApplied(leaderCommitIndex);
1497 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1498 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1500 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1502 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1503 followerActorContext.setCommitIndex(-1);
1504 followerActorContext.setLastApplied(-1);
1506 Follower follower = new Follower(followerActorContext);
1507 followerActor.underlyingActor().setBehavior(follower);
1508 followerActorContext.setCurrentBehavior(follower);
1510 leader = new Leader(leaderActorContext);
1512 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1513 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1514 AppendEntriesReply.class);
1516 MessageCollectorActor.clearMessages(followerActor);
1517 MessageCollectorActor.clearMessages(leaderActor);
1519 // Verify initial AppendEntries sent with the leader's current commit index.
1520 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1521 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1522 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1524 leaderActor.underlyingActor().setBehavior(leader);
1525 leaderActorContext.setCurrentBehavior(leader);
1527 leader.handleMessage(followerActor, appendEntriesReply);
1529 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1530 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1532 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1533 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1534 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1536 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1537 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1538 appendEntries.getEntries().get(0).getData());
1539 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1540 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1541 appendEntries.getEntries().get(1).getData());
1543 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1544 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1546 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1548 ApplyState applyState = applyStateList.get(0);
1549 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1550 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1551 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1552 applyState.getReplicatedLogEntry().getData());
1554 applyState = applyStateList.get(1);
1555 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1556 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1557 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1558 applyState.getReplicatedLogEntry().getData());
1560 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1561 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1565 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1566 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1568 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1569 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1570 new FiniteDuration(1000, TimeUnit.SECONDS));
1572 leaderActorContext.setReplicatedLog(
1573 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1574 long leaderCommitIndex = 1;
1575 leaderActorContext.setCommitIndex(leaderCommitIndex);
1576 leaderActorContext.setLastApplied(leaderCommitIndex);
1578 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1579 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1581 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1583 followerActorContext.setReplicatedLog(
1584 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1585 followerActorContext.setCommitIndex(-1);
1586 followerActorContext.setLastApplied(-1);
1588 Follower follower = new Follower(followerActorContext);
1589 followerActor.underlyingActor().setBehavior(follower);
1590 followerActorContext.setCurrentBehavior(follower);
1592 leader = new Leader(leaderActorContext);
1594 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1595 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1596 AppendEntriesReply.class);
1598 MessageCollectorActor.clearMessages(followerActor);
1599 MessageCollectorActor.clearMessages(leaderActor);
1601 // Verify initial AppendEntries sent with the leader's current commit index.
1602 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1603 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1604 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1606 leaderActor.underlyingActor().setBehavior(leader);
1607 leaderActorContext.setCurrentBehavior(leader);
1609 leader.handleMessage(followerActor, appendEntriesReply);
1611 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1612 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1614 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1615 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1616 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1618 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1619 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1620 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1621 appendEntries.getEntries().get(0).getData());
1622 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1623 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1624 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1625 appendEntries.getEntries().get(1).getData());
1627 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1628 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1630 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1632 ApplyState applyState = applyStateList.get(0);
1633 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1634 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1635 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1636 applyState.getReplicatedLogEntry().getData());
1638 applyState = applyStateList.get(1);
1639 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1640 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1641 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1642 applyState.getReplicatedLogEntry().getData());
1644 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1645 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1646 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1650 public void testHandleAppendEntriesReplyWithNewerTerm() {
1651 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1653 MockRaftActorContext leaderActorContext = createActorContext();
1654 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1655 new FiniteDuration(10000, TimeUnit.SECONDS));
1657 leaderActorContext.setReplicatedLog(
1658 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1660 leader = new Leader(leaderActorContext);
1661 leaderActor.underlyingActor().setBehavior(leader);
1662 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1664 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1665 AppendEntriesReply.class);
1667 assertEquals(false, appendEntriesReply.isSuccess());
1668 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1670 MessageCollectorActor.clearMessages(leaderActor);
1674 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1675 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1677 MockRaftActorContext leaderActorContext = createActorContext();
1678 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1679 new FiniteDuration(10000, TimeUnit.SECONDS));
1681 leaderActorContext.setReplicatedLog(
1682 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1683 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1685 leader = new Leader(leaderActorContext);
1686 leaderActor.underlyingActor().setBehavior(leader);
1687 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1689 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1690 AppendEntriesReply.class);
1692 assertEquals(false, appendEntriesReply.isSuccess());
1693 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1695 MessageCollectorActor.clearMessages(leaderActor);
1699 public void testHandleAppendEntriesReplySuccess() {
1700 logStart("testHandleAppendEntriesReplySuccess");
1702 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1704 leaderActorContext.setReplicatedLog(
1705 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1707 leaderActorContext.setCommitIndex(1);
1708 leaderActorContext.setLastApplied(1);
1709 leaderActorContext.getTermInformation().update(1, "leader");
1711 leader = new Leader(leaderActorContext);
1713 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1715 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1716 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1718 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1720 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1722 assertEquals(RaftState.Leader, raftActorBehavior.state());
1724 assertEquals(2, leaderActorContext.getCommitIndex());
1726 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1727 leaderActor, ApplyJournalEntries.class);
1729 assertEquals(2, leaderActorContext.getLastApplied());
1731 assertEquals(2, applyJournalEntries.getToIndex());
1733 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1736 assertEquals(1,applyStateList.size());
1738 ApplyState applyState = applyStateList.get(0);
1740 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1742 assertEquals(2, followerInfo.getMatchIndex());
1743 assertEquals(3, followerInfo.getNextIndex());
1744 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1745 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1749 public void testHandleAppendEntriesReplyUnknownFollower() {
1750 logStart("testHandleAppendEntriesReplyUnknownFollower");
1752 MockRaftActorContext leaderActorContext = createActorContext();
1754 leader = new Leader(leaderActorContext);
1756 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1758 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1760 assertEquals(RaftState.Leader, raftActorBehavior.state());
1764 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1765 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1767 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1768 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1769 new FiniteDuration(1000, TimeUnit.SECONDS));
1770 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1772 leaderActorContext.setReplicatedLog(
1773 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1774 long leaderCommitIndex = 3;
1775 leaderActorContext.setCommitIndex(leaderCommitIndex);
1776 leaderActorContext.setLastApplied(leaderCommitIndex);
1778 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1779 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1780 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1781 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1783 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1785 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1786 followerActorContext.setCommitIndex(-1);
1787 followerActorContext.setLastApplied(-1);
1789 Follower follower = new Follower(followerActorContext);
1790 followerActor.underlyingActor().setBehavior(follower);
1791 followerActorContext.setCurrentBehavior(follower);
1793 leader = new Leader(leaderActorContext);
1795 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1796 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1797 AppendEntriesReply.class);
1799 MessageCollectorActor.clearMessages(followerActor);
1800 MessageCollectorActor.clearMessages(leaderActor);
1802 // Verify initial AppendEntries sent with the leader's current commit index.
1803 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1804 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1805 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1807 leaderActor.underlyingActor().setBehavior(leader);
1808 leaderActorContext.setCurrentBehavior(leader);
1810 leader.handleMessage(followerActor, appendEntriesReply);
1812 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1813 AppendEntries.class, 2);
1814 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1816 appendEntries = appendEntriesList.get(0);
1817 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1818 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1819 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1821 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1822 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1823 appendEntries.getEntries().get(0).getData());
1824 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1825 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1826 appendEntries.getEntries().get(1).getData());
1828 appendEntries = appendEntriesList.get(1);
1829 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1830 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1831 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1833 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1834 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1835 appendEntries.getEntries().get(0).getData());
1836 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1837 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1838 appendEntries.getEntries().get(1).getData());
1840 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1841 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1843 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1845 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1846 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1850 public void testHandleRequestVoteReply() {
1851 logStart("testHandleRequestVoteReply");
1853 MockRaftActorContext leaderActorContext = createActorContext();
1855 leader = new Leader(leaderActorContext);
1857 // Should be a no-op.
1858 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1859 new RequestVoteReply(1, true));
1861 assertEquals(RaftState.Leader, raftActorBehavior.state());
1863 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1865 assertEquals(RaftState.Leader, raftActorBehavior.state());
1869 public void testIsolatedLeaderCheckNoFollowers() {
1870 logStart("testIsolatedLeaderCheckNoFollowers");
1872 MockRaftActorContext leaderActorContext = createActorContext();
1874 leader = new Leader(leaderActorContext);
1875 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876 assertTrue(newBehavior instanceof Leader);
1880 public void testIsolatedLeaderCheckNoVotingFollowers() {
1881 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1883 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1884 Follower follower = new Follower(followerActorContext);
1885 followerActor.underlyingActor().setBehavior(follower);
1887 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1888 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1889 new FiniteDuration(1000, TimeUnit.SECONDS));
1890 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1892 leader = new Leader(leaderActorContext);
1893 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1894 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1895 assertTrue("Expected Leader", newBehavior instanceof Leader);
1898 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1899 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1900 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1902 MockRaftActorContext leaderActorContext = createActorContext();
1904 Map<String, String> peerAddresses = new HashMap<>();
1905 peerAddresses.put("follower-1", followerActor1.path().toString());
1906 peerAddresses.put("follower-2", followerActor2.path().toString());
1908 leaderActorContext.setPeerAddresses(peerAddresses);
1909 leaderActorContext.setRaftPolicy(raftPolicy);
1911 leader = new Leader(leaderActorContext);
1913 leader.markFollowerActive("follower-1");
1914 leader.markFollowerActive("follower-2");
1915 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1916 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1918 // kill 1 follower and verify if that got killed
1919 final TestKit probe = new TestKit(getSystem());
1920 probe.watch(followerActor1);
1921 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1922 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1923 assertEquals(termMsg1.getActor(), followerActor1);
1925 leader.markFollowerInActive("follower-1");
1926 leader.markFollowerActive("follower-2");
1927 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1928 assertTrue("Behavior not instance of Leader when majority of followers are active",
1929 newBehavior instanceof Leader);
1931 // kill 2nd follower and leader should change to Isolated leader
1932 followerActor2.tell(PoisonPill.getInstance(), null);
1933 probe.watch(followerActor2);
1934 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1935 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1936 assertEquals(termMsg2.getActor(), followerActor2);
1938 leader.markFollowerInActive("follower-2");
1939 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943 public void testIsolatedLeaderCheckTwoFollowers() {
1944 logStart("testIsolatedLeaderCheckTwoFollowers");
1946 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1948 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1949 newBehavior instanceof IsolatedLeader);
1953 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1954 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1956 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1958 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1959 newBehavior instanceof Leader);
1963 public void testLaggingFollowerStarvation() {
1964 logStart("testLaggingFollowerStarvation");
1966 String leaderActorId = actorFactory.generateActorId("leader");
1967 String follower1ActorId = actorFactory.generateActorId("follower");
1968 String follower2ActorId = actorFactory.generateActorId("follower");
1970 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1971 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1973 MockRaftActorContext leaderActorContext =
1974 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1976 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1977 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1978 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1980 leaderActorContext.setConfigParams(configParams);
1982 leaderActorContext.setReplicatedLog(
1983 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1985 Map<String, String> peerAddresses = new HashMap<>();
1986 peerAddresses.put(follower1ActorId,
1987 follower1Actor.path().toString());
1988 peerAddresses.put(follower2ActorId,
1989 follower2Actor.path().toString());
1991 leaderActorContext.setPeerAddresses(peerAddresses);
1992 leaderActorContext.getTermInformation().update(1, leaderActorId);
1994 leader = createBehavior(leaderActorContext);
1996 leaderActor.underlyingActor().setBehavior(leader);
1998 for (int i = 1; i < 6; i++) {
1999 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2000 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2001 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2002 assertTrue(newBehavior == leader);
2003 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2006 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2007 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2009 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2010 heartbeats.size() > 1);
2012 // Check if follower-2 got AppendEntries during this time and was not starved
2013 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2015 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2016 appendEntries.size() > 1);
2020 public void testReplicationConsensusWithNonVotingFollower() {
2021 logStart("testReplicationConsensusWithNonVotingFollower");
2023 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2024 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2025 new FiniteDuration(1000, TimeUnit.SECONDS));
2027 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2028 leaderActorContext.setCommitIndex(-1);
2029 leaderActorContext.setLastApplied(-1);
2031 String nonVotingFollowerId = "nonvoting-follower";
2032 ActorRef nonVotingFollowerActor = actorFactory.createActor(
2033 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2035 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2036 VotingState.NON_VOTING);
2038 leader = new Leader(leaderActorContext);
2039 leaderActorContext.setCurrentBehavior(leader);
2041 // Ignore initial heartbeats
2042 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2043 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2045 MessageCollectorActor.clearMessages(followerActor);
2046 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2047 MessageCollectorActor.clearMessages(leaderActor);
2049 // Send a Replicate message and wait for AppendEntries.
2050 sendReplicate(leaderActorContext, 0);
2052 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2053 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2055 // Send reply only from the voting follower and verify consensus via ApplyState.
2056 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2058 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2060 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2062 MessageCollectorActor.clearMessages(followerActor);
2063 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2064 MessageCollectorActor.clearMessages(leaderActor);
2066 // Send another Replicate message
2067 sendReplicate(leaderActorContext, 1);
2069 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2071 AppendEntries.class);
2072 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2073 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2075 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2076 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2078 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2080 // Send reply from the voting follower and verify consensus.
2081 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2083 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2087 public void testTransferLeadershipWithFollowerInSync() {
2088 logStart("testTransferLeadershipWithFollowerInSync");
2090 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2091 leaderActorContext.setLastApplied(-1);
2092 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2093 new FiniteDuration(1000, TimeUnit.SECONDS));
2094 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2096 leader = new Leader(leaderActorContext);
2097 leaderActorContext.setCurrentBehavior(leader);
2099 // Initial heartbeat
2100 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2101 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2102 MessageCollectorActor.clearMessages(followerActor);
2104 sendReplicate(leaderActorContext, 0);
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2107 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2108 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2109 MessageCollectorActor.clearMessages(followerActor);
2111 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2112 leader.transferLeadership(mockTransferCohort);
2114 verify(mockTransferCohort, never()).transferComplete();
2115 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2116 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2117 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2119 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2120 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2122 // Leader should force an election timeout
2123 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2125 verify(mockTransferCohort).transferComplete();
2129 public void testTransferLeadershipWithEmptyLog() {
2130 logStart("testTransferLeadershipWithEmptyLog");
2132 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2133 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2134 new FiniteDuration(1000, TimeUnit.SECONDS));
2135 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2137 leader = new Leader(leaderActorContext);
2138 leaderActorContext.setCurrentBehavior(leader);
2140 // Initial heartbeat
2141 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2142 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2143 MessageCollectorActor.clearMessages(followerActor);
2145 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2146 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2147 leader.transferLeadership(mockTransferCohort);
2149 verify(mockTransferCohort, never()).transferComplete();
2150 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2151 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2153 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2154 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2156 // Leader should force an election timeout
2157 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2159 verify(mockTransferCohort).transferComplete();
2163 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2164 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2166 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2167 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2168 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2170 leader = new Leader(leaderActorContext);
2171 leaderActorContext.setCurrentBehavior(leader);
2173 // Initial heartbeat
2174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2175 MessageCollectorActor.clearMessages(followerActor);
2177 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2178 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2179 leader.transferLeadership(mockTransferCohort);
2181 verify(mockTransferCohort, never()).transferComplete();
2183 // Sync up the follower.
2184 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2185 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2186 MessageCollectorActor.clearMessages(followerActor);
2188 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2189 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2190 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2191 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2192 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2194 // Leader should force an election timeout
2195 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2197 verify(mockTransferCohort).transferComplete();
2201 public void testTransferLeadershipWithFollowerSyncTimeout() {
2202 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2204 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2205 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2206 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2207 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2208 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2210 leader = new Leader(leaderActorContext);
2211 leaderActorContext.setCurrentBehavior(leader);
2213 // Initial heartbeat
2214 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2215 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2216 MessageCollectorActor.clearMessages(followerActor);
2218 sendReplicate(leaderActorContext, 0);
2219 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2221 MessageCollectorActor.clearMessages(followerActor);
2223 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2224 leader.transferLeadership(mockTransferCohort);
2226 verify(mockTransferCohort, never()).transferComplete();
2228 // Send heartbeats to time out the transfer.
2229 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2230 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2231 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2232 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2235 verify(mockTransferCohort).abortTransfer();
2236 verify(mockTransferCohort, never()).transferComplete();
2237 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2241 public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2242 logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2244 final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2245 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2246 new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2247 final MockRaftActorContext.MockPayload largePayload =
2248 new MockRaftActorContext.MockPayload("large", serializedSize);
2250 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2251 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2252 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2253 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2254 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2255 leaderActorContext.setCommitIndex(-1);
2256 leaderActorContext.setLastApplied(-1);
2258 leader = new Leader(leaderActorContext);
2259 leaderActorContext.setCurrentBehavior(leader);
2261 // Send initial heartbeat reply so follower is marked active
2262 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2263 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2264 MessageCollectorActor.clearMessages(followerActor);
2266 // Send normal payload first to prime commit index.
2267 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2268 sendReplicate(leaderActorContext, term, 0);
2270 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2271 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2272 assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2274 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2275 assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2276 MessageCollectorActor.clearMessages(followerActor);
2278 // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2279 sendReplicate(leaderActorContext, term, 1, largePayload);
2281 MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2282 assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2283 assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2285 final Identifier slicingId = messageSlice.getIdentifier();
2287 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2288 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2289 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2290 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2291 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2292 MessageCollectorActor.clearMessages(followerActor);
2294 // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2296 // Sleep for the heartbeat interval so AppendEntries is sent.
2297 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2298 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2300 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2302 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2303 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2304 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2305 MessageCollectorActor.clearMessages(followerActor);
2307 // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2309 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2310 messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2311 assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2313 leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2315 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2317 MessageCollectorActor.clearMessages(followerActor);
2319 // Send another normal payload.
2321 sendReplicate(leaderActorContext, term, 2);
2323 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2324 assertEquals("Entries size", 1, appendEntries.getEntries().size());
2325 assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2326 assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2330 public void testLargePayloadSlicingExpiration() {
2331 logStart("testLargePayloadSlicingExpiration");
2333 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2334 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2335 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2336 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2337 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2338 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2339 leaderActorContext.setCommitIndex(-1);
2340 leaderActorContext.setLastApplied(-1);
2342 final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2343 leader = new Leader(leaderActorContext);
2344 leaderActorContext.setCurrentBehavior(leader);
2346 // Send initial heartbeat reply so follower is marked active
2347 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2348 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2349 MessageCollectorActor.clearMessages(followerActor);
2351 sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2352 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2353 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2355 // Sleep for at least 3 * election timeout so the slicing state expires.
2356 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2357 .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
2358 MessageCollectorActor.clearMessages(followerActor);
2360 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2362 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2363 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2364 assertEquals("Entries size", 0, appendEntries.getEntries().size());
2366 MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2367 MessageCollectorActor.clearMessages(followerActor);
2369 // Send an AppendEntriesReply - this should restart the slicing.
2371 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2372 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2374 leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2376 MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2380 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2381 final ActorRef actorRef, final RaftRPC rpc) {
2382 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2383 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2386 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2388 private final long electionTimeOutIntervalMillis;
2389 private final int snapshotChunkSize;
2391 MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2392 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2393 this.snapshotChunkSize = snapshotChunkSize;
2397 public FiniteDuration getElectionTimeOutInterval() {
2398 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2402 public int getSnapshotChunkSize() {
2403 return snapshotChunkSize;