2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.google.protobuf.ByteString;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
41 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
42 import org.opendaylight.controller.cluster.raft.RaftActorContext;
43 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
44 import org.opendaylight.controller.cluster.raft.RaftState;
45 import org.opendaylight.controller.cluster.raft.RaftVersions;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
63 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
64 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
65 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
66 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
67 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
68 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
69 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
70 import org.opendaylight.yangtools.concepts.Identifier;
71 import scala.concurrent.duration.FiniteDuration;
73 public class LeaderTest extends AbstractLeaderTest<Leader> {
75 static final String FOLLOWER_ID = "follower";
76 public static final String LEADER_ID = "leader";
78 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
79 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
81 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
82 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
84 private Leader leader;
85 private final short payloadVersion = 5;
89 public void tearDown() throws Exception {
98 public void testHandleMessageForUnknownMessage() throws Exception {
99 logStart("testHandleMessageForUnknownMessage");
101 leader = new Leader(createActorContext());
103 // handle message should null when it receives an unknown message
104 assertNull(leader.handleMessage(followerActor, "foo"));
108 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
109 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
111 MockRaftActorContext actorContext = createActorContextWithFollower();
112 actorContext.setCommitIndex(-1);
113 actorContext.setPayloadVersion(payloadVersion);
116 actorContext.getTermInformation().update(term, "");
118 leader = new Leader(actorContext);
119 actorContext.setCurrentBehavior(leader);
121 // Leader should send an immediate heartbeat with no entries as follower is inactive.
122 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
123 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
124 assertEquals("getTerm", term, appendEntries.getTerm());
125 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
126 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
127 assertEquals("Entries size", 0, appendEntries.getEntries().size());
128 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
130 // The follower would normally reply - simulate that explicitly here.
131 leader.handleMessage(followerActor, new AppendEntriesReply(
132 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
133 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
135 followerActor.underlyingActor().clear();
137 // Sleep for the heartbeat interval so AppendEntries is sent.
138 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
139 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
141 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
143 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
144 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
145 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
146 assertEquals("Entries size", 1, appendEntries.getEntries().size());
147 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
148 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
149 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
153 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
154 return sendReplicate(actorContext, 1, index);
157 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
158 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
160 actorContext.getReplicatedLog().append(newEntry);
161 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
165 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
166 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
168 MockRaftActorContext actorContext = createActorContextWithFollower();
171 actorContext.getTermInformation().update(term, "");
173 leader = new Leader(actorContext);
175 // Leader will send an immediate heartbeat - ignore it.
176 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
178 // The follower would normally reply - simulate that explicitly here.
179 long lastIndex = actorContext.getReplicatedLog().lastIndex();
180 leader.handleMessage(followerActor, new AppendEntriesReply(
181 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
182 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
184 followerActor.underlyingActor().clear();
186 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
188 // State should not change
189 assertTrue(raftBehavior instanceof Leader);
191 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
193 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
194 assertEquals("Entries size", 1, appendEntries.getEntries().size());
195 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
196 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
197 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
198 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
202 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
203 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
205 MockRaftActorContext actorContext = createActorContextWithFollower();
206 actorContext.setCommitIndex(-1);
207 actorContext.setLastApplied(-1);
209 // The raft context is initialized with a couple log entries. However the commitIndex
210 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
211 // committed and applied. Now it regains leadership with a higher term (2).
212 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
213 long newTerm = prevTerm + 1;
214 actorContext.getTermInformation().update(newTerm, "");
216 leader = new Leader(actorContext);
217 actorContext.setCurrentBehavior(leader);
219 // Leader will send an immediate heartbeat - ignore it.
220 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
222 // The follower replies with the leader's current last index and term, simulating that it is
223 // up to date with the leader.
224 long lastIndex = actorContext.getReplicatedLog().lastIndex();
225 leader.handleMessage(followerActor, new AppendEntriesReply(
226 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
228 // The commit index should not get updated even though consensus was reached. This is b/c the
229 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
230 // from previous terms by counting replicas".
231 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
233 followerActor.underlyingActor().clear();
235 // Now replicate a new entry with the new term 2.
236 long newIndex = lastIndex + 1;
237 sendReplicate(actorContext, newTerm, newIndex);
239 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
240 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
241 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
242 assertEquals("Entries size", 1, appendEntries.getEntries().size());
243 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
244 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
245 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
247 // The follower replies with success. The leader should now update the commit index to the new index
248 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
249 // prior entries are committed indirectly".
250 leader.handleMessage(followerActor, new AppendEntriesReply(
251 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
253 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
257 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
258 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
260 MockRaftActorContext actorContext = createActorContextWithFollower();
261 actorContext.setRaftPolicy(createRaftPolicy(true, true));
264 actorContext.getTermInformation().update(term, "");
266 leader = new Leader(actorContext);
268 // Leader will send an immediate heartbeat - ignore it.
269 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
271 // The follower would normally reply - simulate that explicitly here.
272 long lastIndex = actorContext.getReplicatedLog().lastIndex();
273 leader.handleMessage(followerActor, new AppendEntriesReply(
274 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
275 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
277 followerActor.underlyingActor().clear();
279 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
281 // State should not change
282 assertTrue(raftBehavior instanceof Leader);
284 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
286 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
287 assertEquals("Entries size", 1, appendEntries.getEntries().size());
288 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
289 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
290 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
291 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
295 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
296 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
298 MockRaftActorContext actorContext = createActorContextWithFollower();
299 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
301 public FiniteDuration getHeartBeatInterval() {
302 return FiniteDuration.apply(5, TimeUnit.SECONDS);
307 actorContext.getTermInformation().update(term, "");
309 leader = new Leader(actorContext);
311 // Leader will send an immediate heartbeat - ignore it.
312 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
314 // The follower would normally reply - simulate that explicitly here.
315 long lastIndex = actorContext.getReplicatedLog().lastIndex();
316 leader.handleMessage(followerActor, new AppendEntriesReply(
317 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
318 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
320 followerActor.underlyingActor().clear();
322 for (int i = 0; i < 5; i++) {
323 sendReplicate(actorContext, lastIndex + i + 1);
326 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
327 // We expect only 1 message to be sent because of two reasons,
328 // - an append entries reply was not received
329 // - the heartbeat interval has not expired
330 // In this scenario if multiple messages are sent they would likely be duplicates
331 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
335 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
336 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
338 MockRaftActorContext actorContext = createActorContextWithFollower();
339 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
341 public FiniteDuration getHeartBeatInterval() {
342 return FiniteDuration.apply(5, TimeUnit.SECONDS);
347 actorContext.getTermInformation().update(term, "");
349 leader = new Leader(actorContext);
351 // Leader will send an immediate heartbeat - ignore it.
352 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
354 // The follower would normally reply - simulate that explicitly here.
355 long lastIndex = actorContext.getReplicatedLog().lastIndex();
356 leader.handleMessage(followerActor, new AppendEntriesReply(
357 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
358 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
360 followerActor.underlyingActor().clear();
362 for (int i = 0; i < 3; i++) {
363 sendReplicate(actorContext, lastIndex + i + 1);
364 leader.handleMessage(followerActor, new AppendEntriesReply(
365 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
369 for (int i = 3; i < 5; i++) {
370 sendReplicate(actorContext, lastIndex + i + 1);
373 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
374 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
375 // get sent to the follower - but not the 5th
376 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
378 for (int i = 0; i < 4; i++) {
379 long expected = allMessages.get(i).getEntries().get(0).getIndex();
380 assertEquals(expected, i + 2);
385 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
386 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
388 MockRaftActorContext actorContext = createActorContextWithFollower();
389 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
391 public FiniteDuration getHeartBeatInterval() {
392 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
397 actorContext.getTermInformation().update(term, "");
399 leader = new Leader(actorContext);
401 // Leader will send an immediate heartbeat - ignore it.
402 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
404 // The follower would normally reply - simulate that explicitly here.
405 long lastIndex = actorContext.getReplicatedLog().lastIndex();
406 leader.handleMessage(followerActor, new AppendEntriesReply(
407 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
408 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
410 followerActor.underlyingActor().clear();
412 sendReplicate(actorContext, lastIndex + 1);
414 // Wait slightly longer than heartbeat duration
415 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
417 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
419 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
420 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
422 assertEquals(1, allMessages.get(0).getEntries().size());
423 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
424 assertEquals(1, allMessages.get(1).getEntries().size());
425 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
430 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
431 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
433 MockRaftActorContext actorContext = createActorContextWithFollower();
434 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
436 public FiniteDuration getHeartBeatInterval() {
437 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
442 actorContext.getTermInformation().update(term, "");
444 leader = new Leader(actorContext);
446 // Leader will send an immediate heartbeat - ignore it.
447 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
449 // The follower would normally reply - simulate that explicitly here.
450 long lastIndex = actorContext.getReplicatedLog().lastIndex();
451 leader.handleMessage(followerActor, new AppendEntriesReply(
452 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
453 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
455 followerActor.underlyingActor().clear();
457 for (int i = 0; i < 3; i++) {
458 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
459 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
462 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
463 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
467 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
468 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
470 MockRaftActorContext actorContext = createActorContextWithFollower();
471 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
473 public FiniteDuration getHeartBeatInterval() {
474 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
479 actorContext.getTermInformation().update(term, "");
481 leader = new Leader(actorContext);
483 // Leader will send an immediate heartbeat - ignore it.
484 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
486 // The follower would normally reply - simulate that explicitly here.
487 long lastIndex = actorContext.getReplicatedLog().lastIndex();
488 leader.handleMessage(followerActor, new AppendEntriesReply(
489 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
490 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
492 followerActor.underlyingActor().clear();
494 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
495 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
496 sendReplicate(actorContext, lastIndex + 1);
498 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
499 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
501 assertEquals(0, allMessages.get(0).getEntries().size());
502 assertEquals(1, allMessages.get(1).getEntries().size());
507 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
508 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
510 MockRaftActorContext actorContext = createActorContext();
512 leader = new Leader(actorContext);
514 actorContext.setLastApplied(0);
516 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
517 long term = actorContext.getTermInformation().getCurrentTerm();
518 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
519 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
521 actorContext.getReplicatedLog().append(newEntry);
523 final Identifier id = new MockIdentifier("state-id");
524 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
525 new Replicate(leaderActor, id, newEntry, true));
527 // State should not change
528 assertTrue(raftBehavior instanceof Leader);
530 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
532 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
533 // one since lastApplied state is 0.
534 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
535 leaderActor, ApplyState.class);
536 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
538 for (int i = 0; i <= newLogIndex - 1; i++ ) {
539 ApplyState applyState = applyStateList.get(i);
540 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
541 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
544 ApplyState last = applyStateList.get((int) newLogIndex - 1);
545 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
546 assertEquals("getIdentifier", id, last.getIdentifier());
550 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
551 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
553 final MockRaftActorContext actorContext = createActorContextWithFollower();
555 Map<String, String> leadersSnapshot = new HashMap<>();
556 leadersSnapshot.put("1", "A");
557 leadersSnapshot.put("2", "B");
558 leadersSnapshot.put("3", "C");
561 actorContext.getReplicatedLog().removeFrom(0);
563 final int commitIndex = 3;
564 final int snapshotIndex = 2;
565 final int snapshotTerm = 1;
567 // set the snapshot variables in replicatedlog
568 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570 actorContext.setCommitIndex(commitIndex);
571 //set follower timeout to 2 mins, helps during debugging
572 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
574 leader = new Leader(actorContext);
576 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
577 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
579 //update follower timestamp
580 leader.markFollowerActive(FOLLOWER_ID);
582 ByteString bs = toByteString(leadersSnapshot);
583 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
584 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
585 -1, null, null), ByteSource.wrap(bs.toByteArray())));
586 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
587 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
588 fts.setSnapshotBytes(bs);
589 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
591 //send first chunk and no InstallSnapshotReply received yet
593 fts.incrementChunkIndex();
595 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596 TimeUnit.MILLISECONDS);
598 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
600 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
602 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604 //InstallSnapshotReply received
605 fts.markSendStatus(true);
607 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611 assertEquals(commitIndex, is.getLastIncludedIndex());
615 public void testSendAppendEntriesSnapshotScenario() throws Exception {
616 logStart("testSendAppendEntriesSnapshotScenario");
618 final MockRaftActorContext actorContext = createActorContextWithFollower();
620 Map<String, String> leadersSnapshot = new HashMap<>();
621 leadersSnapshot.put("1", "A");
622 leadersSnapshot.put("2", "B");
623 leadersSnapshot.put("3", "C");
626 actorContext.getReplicatedLog().removeFrom(0);
628 final int followersLastIndex = 2;
629 final int snapshotIndex = 3;
630 final int newEntryIndex = 4;
631 final int snapshotTerm = 1;
632 final int currentTerm = 2;
634 // set the snapshot variables in replicatedlog
635 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637 actorContext.setCommitIndex(followersLastIndex);
639 leader = new Leader(actorContext);
641 // Leader will send an immediate heartbeat - ignore it.
642 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
645 SimpleReplicatedLogEntry entry =
646 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
647 new MockRaftActorContext.MockPayload("D"));
649 actorContext.getReplicatedLog().append(entry);
651 //update follower timestamp
652 leader.markFollowerActive(FOLLOWER_ID);
654 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655 RaftActorBehavior raftBehavior = leader.handleMessage(
656 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
658 assertTrue(raftBehavior instanceof Leader);
660 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
664 public void testInitiateInstallSnapshot() throws Exception {
665 logStart("testInitiateInstallSnapshot");
667 MockRaftActorContext actorContext = createActorContextWithFollower();
670 actorContext.getReplicatedLog().removeFrom(0);
672 final int followersLastIndex = 2;
673 final int snapshotIndex = 3;
674 final int newEntryIndex = 4;
675 final int snapshotTerm = 1;
676 final int currentTerm = 2;
678 // set the snapshot variables in replicatedlog
679 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681 actorContext.setLastApplied(3);
682 actorContext.setCommitIndex(followersLastIndex);
684 leader = new Leader(actorContext);
686 // Leader will send an immediate heartbeat - ignore it.
687 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689 // set the snapshot as absent and check if capture-snapshot is invoked.
690 leader.setSnapshot(null);
693 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
694 new MockRaftActorContext.MockPayload("D"));
696 actorContext.getReplicatedLog().append(entry);
698 //update follower timestamp
699 leader.markFollowerActive(FOLLOWER_ID);
701 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
703 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707 assertEquals(3, cs.getLastAppliedIndex());
708 assertEquals(1, cs.getLastAppliedTerm());
709 assertEquals(4, cs.getLastIndex());
710 assertEquals(2, cs.getLastTerm());
712 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
715 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
719 public void testInitiateForceInstallSnapshot() throws Exception {
720 logStart("testInitiateForceInstallSnapshot");
722 MockRaftActorContext actorContext = createActorContextWithFollower();
724 final int followersLastIndex = 2;
725 final int snapshotIndex = -1;
726 final int newEntryIndex = 4;
727 final int snapshotTerm = -1;
728 final int currentTerm = 2;
730 // set the snapshot variables in replicatedlog
731 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733 actorContext.setLastApplied(3);
734 actorContext.setCommitIndex(followersLastIndex);
736 actorContext.getReplicatedLog().removeFrom(0);
738 leader = new Leader(actorContext);
739 actorContext.setCurrentBehavior(leader);
741 // Leader will send an immediate heartbeat - ignore it.
742 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744 // set the snapshot as absent and check if capture-snapshot is invoked.
745 leader.setSnapshot(null);
747 for (int i = 0; i < 4; i++) {
748 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
749 new MockRaftActorContext.MockPayload("X" + i)));
753 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
754 new MockRaftActorContext.MockPayload("D"));
756 actorContext.getReplicatedLog().append(entry);
758 //update follower timestamp
759 leader.markFollowerActive(FOLLOWER_ID);
761 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762 // installed with a SendInstallSnapshot
763 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
765 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769 assertEquals(3, cs.getLastAppliedIndex());
770 assertEquals(1, cs.getLastAppliedTerm());
771 assertEquals(4, cs.getLastIndex());
772 assertEquals(2, cs.getLastTerm());
774 // if an initiate is started again when first is in progress, it should not initiate Capture
775 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
777 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
782 public void testInstallSnapshot() throws Exception {
783 logStart("testInstallSnapshot");
785 final MockRaftActorContext actorContext = createActorContextWithFollower();
787 Map<String, String> leadersSnapshot = new HashMap<>();
788 leadersSnapshot.put("1", "A");
789 leadersSnapshot.put("2", "B");
790 leadersSnapshot.put("3", "C");
793 actorContext.getReplicatedLog().removeFrom(0);
795 final int lastAppliedIndex = 3;
796 final int snapshotIndex = 2;
797 final int snapshotTerm = 1;
798 final int currentTerm = 2;
800 // set the snapshot variables in replicatedlog
801 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804 actorContext.setCommitIndex(lastAppliedIndex);
805 actorContext.setLastApplied(lastAppliedIndex);
807 leader = new Leader(actorContext);
809 // Initial heartbeat.
810 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
816 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
817 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
819 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
820 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
822 assertTrue(raftBehavior instanceof Leader);
824 // check if installsnapshot gets called with the correct values.
826 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
827 InstallSnapshot.class);
829 assertNotNull(installSnapshot.getData());
830 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
833 assertEquals(currentTerm, installSnapshot.getTerm());
837 public void testForceInstallSnapshot() throws Exception {
838 logStart("testForceInstallSnapshot");
840 final MockRaftActorContext actorContext = createActorContextWithFollower();
842 Map<String, String> leadersSnapshot = new HashMap<>();
843 leadersSnapshot.put("1", "A");
844 leadersSnapshot.put("2", "B");
845 leadersSnapshot.put("3", "C");
847 final int lastAppliedIndex = 3;
848 final int snapshotIndex = -1;
849 final int snapshotTerm = -1;
850 final int currentTerm = 2;
852 // set the snapshot variables in replicatedlog
853 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856 actorContext.setCommitIndex(lastAppliedIndex);
857 actorContext.setLastApplied(lastAppliedIndex);
859 leader = new Leader(actorContext);
861 // Initial heartbeat.
862 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
867 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
868 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
869 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
871 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
872 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
874 assertTrue(raftBehavior instanceof Leader);
876 // check if installsnapshot gets called with the correct values.
878 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
879 InstallSnapshot.class);
881 assertNotNull(installSnapshot.getData());
882 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
883 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885 assertEquals(currentTerm, installSnapshot.getTerm());
889 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
890 logStart("testHandleInstallSnapshotReplyLastChunk");
892 MockRaftActorContext actorContext = createActorContextWithFollower();
894 final int commitIndex = 3;
895 final int snapshotIndex = 2;
896 final int snapshotTerm = 1;
897 final int currentTerm = 2;
899 actorContext.setCommitIndex(commitIndex);
901 leader = new Leader(actorContext);
902 actorContext.setCurrentBehavior(leader);
904 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
905 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
907 // Ignore initial heartbeat.
908 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
910 Map<String, String> leadersSnapshot = new HashMap<>();
911 leadersSnapshot.put("1", "A");
912 leadersSnapshot.put("2", "B");
913 leadersSnapshot.put("3", "C");
915 // set the snapshot variables in replicatedlog
917 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
918 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
919 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
921 ByteString bs = toByteString(leadersSnapshot);
922 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
923 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
924 -1, null, null), ByteSource.wrap(bs.toByteArray())));
925 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
926 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
927 fts.setSnapshotBytes(bs);
928 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
929 while (!fts.isLastChunk(fts.getChunkIndex())) {
931 fts.incrementChunkIndex();
935 actorContext.getReplicatedLog().removeFrom(0);
937 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
938 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
940 assertTrue(raftBehavior instanceof Leader);
942 assertEquals(1, leader.followerLogSize());
943 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
945 assertNull(fli.getInstallSnapshotState());
946 assertEquals(commitIndex, fli.getMatchIndex());
947 assertEquals(commitIndex + 1, fli.getNextIndex());
948 assertFalse(leader.hasSnapshot());
952 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
953 logStart("testSendSnapshotfromInstallSnapshotReply");
955 MockRaftActorContext actorContext = createActorContextWithFollower();
957 final int commitIndex = 3;
958 final int snapshotIndex = 2;
959 final int snapshotTerm = 1;
960 final int currentTerm = 2;
962 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
964 public int getSnapshotChunkSize() {
968 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
969 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
971 actorContext.setConfigParams(configParams);
972 actorContext.setCommitIndex(commitIndex);
974 leader = new Leader(actorContext);
975 actorContext.setCurrentBehavior(leader);
977 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
980 Map<String, String> leadersSnapshot = new HashMap<>();
981 leadersSnapshot.put("1", "A");
982 leadersSnapshot.put("2", "B");
983 leadersSnapshot.put("3", "C");
985 // set the snapshot variables in replicatedlog
986 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
990 ByteString bs = toByteString(leadersSnapshot);
991 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
992 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
995 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
997 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
998 InstallSnapshot.class);
1000 assertEquals(1, installSnapshot.getChunkIndex());
1001 assertEquals(3, installSnapshot.getTotalChunks());
1003 followerActor.underlyingActor().clear();
1004 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1007 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1009 assertEquals(2, installSnapshot.getChunkIndex());
1010 assertEquals(3, installSnapshot.getTotalChunks());
1012 followerActor.underlyingActor().clear();
1013 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1016 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1018 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1019 followerActor.underlyingActor().clear();
1020 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1021 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1023 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1025 assertNull(installSnapshot);
1030 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1031 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1033 MockRaftActorContext actorContext = createActorContextWithFollower();
1035 final int commitIndex = 3;
1036 final int snapshotIndex = 2;
1037 final int snapshotTerm = 1;
1038 final int currentTerm = 2;
1040 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1042 public int getSnapshotChunkSize() {
1047 actorContext.setCommitIndex(commitIndex);
1049 leader = new Leader(actorContext);
1051 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1052 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1054 Map<String, String> leadersSnapshot = new HashMap<>();
1055 leadersSnapshot.put("1", "A");
1056 leadersSnapshot.put("2", "B");
1057 leadersSnapshot.put("3", "C");
1059 // set the snapshot variables in replicatedlog
1060 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1061 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1062 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1064 ByteString bs = toByteString(leadersSnapshot);
1065 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1066 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1069 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1070 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1072 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1073 InstallSnapshot.class);
1075 assertEquals(1, installSnapshot.getChunkIndex());
1076 assertEquals(3, installSnapshot.getTotalChunks());
1078 followerActor.underlyingActor().clear();
1080 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081 FOLLOWER_ID, -1, false));
1083 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1084 TimeUnit.MILLISECONDS);
1086 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1088 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1090 assertEquals(1, installSnapshot.getChunkIndex());
1091 assertEquals(3, installSnapshot.getTotalChunks());
1095 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1096 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1098 MockRaftActorContext actorContext = createActorContextWithFollower();
1100 final int commitIndex = 3;
1101 final int snapshotIndex = 2;
1102 final int snapshotTerm = 1;
1103 final int currentTerm = 2;
1105 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1107 public int getSnapshotChunkSize() {
1112 actorContext.setCommitIndex(commitIndex);
1114 leader = new Leader(actorContext);
1116 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1117 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1119 Map<String, String> leadersSnapshot = new HashMap<>();
1120 leadersSnapshot.put("1", "A");
1121 leadersSnapshot.put("2", "B");
1122 leadersSnapshot.put("3", "C");
1124 // set the snapshot variables in replicatedlog
1125 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1126 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1127 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1129 ByteString bs = toByteString(leadersSnapshot);
1130 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1131 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1134 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1136 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1137 InstallSnapshot.class);
1139 assertEquals(1, installSnapshot.getChunkIndex());
1140 assertEquals(3, installSnapshot.getTotalChunks());
1141 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1142 installSnapshot.getLastChunkHashCode().get().intValue());
1144 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1146 followerActor.underlyingActor().clear();
1148 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1149 FOLLOWER_ID, 1, true));
1151 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1153 assertEquals(2, installSnapshot.getChunkIndex());
1154 assertEquals(3, installSnapshot.getTotalChunks());
1155 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1159 public void testLeaderInstallSnapshotState() {
1160 logStart("testLeaderInstallSnapshotState");
1162 Map<String, String> leadersSnapshot = new HashMap<>();
1163 leadersSnapshot.put("1", "A");
1164 leadersSnapshot.put("2", "B");
1165 leadersSnapshot.put("3", "C");
1167 ByteString bs = toByteString(leadersSnapshot);
1168 byte[] barray = bs.toByteArray();
1170 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1171 fts.setSnapshotBytes(bs);
1173 assertEquals(bs.size(), barray.length);
1176 for (int i = 0; i < barray.length; i = i + 50) {
1177 int length = i + 50;
1180 if (i + 50 > barray.length) {
1181 length = barray.length;
1184 byte[] chunk = fts.getNextChunk();
1185 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1186 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1188 fts.markSendStatus(true);
1189 if (!fts.isLastChunk(chunkIndex)) {
1190 fts.incrementChunkIndex();
1194 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1198 protected Leader createBehavior(final RaftActorContext actorContext) {
1199 return new Leader(actorContext);
1203 protected MockRaftActorContext createActorContext() {
1204 return createActorContext(leaderActor);
1208 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1209 return createActorContext(LEADER_ID, actorRef);
1212 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1213 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1214 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1215 configParams.setElectionTimeoutFactor(100000);
1216 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1217 context.setConfigParams(configParams);
1218 context.setPayloadVersion(payloadVersion);
1222 private MockRaftActorContext createActorContextWithFollower() {
1223 MockRaftActorContext actorContext = createActorContext();
1224 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1225 followerActor.path().toString()).build());
1226 return actorContext;
1229 private MockRaftActorContext createFollowerActorContextWithLeader() {
1230 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1232 followerConfig.setElectionTimeoutFactor(10000);
1233 followerActorContext.setConfigParams(followerConfig);
1234 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1235 return followerActorContext;
1239 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1240 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1242 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1244 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1246 Follower follower = new Follower(followerActorContext);
1247 followerActor.underlyingActor().setBehavior(follower);
1248 followerActorContext.setCurrentBehavior(follower);
1250 Map<String, String> peerAddresses = new HashMap<>();
1251 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1253 leaderActorContext.setPeerAddresses(peerAddresses);
1255 leaderActorContext.getReplicatedLog().removeFrom(0);
1258 leaderActorContext.setReplicatedLog(
1259 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1261 leaderActorContext.setCommitIndex(1);
1263 followerActorContext.getReplicatedLog().removeFrom(0);
1265 // follower too has the exact same log entries and has the same commit index
1266 followerActorContext.setReplicatedLog(
1267 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1269 followerActorContext.setCommitIndex(1);
1271 leader = new Leader(leaderActorContext);
1272 leaderActorContext.setCurrentBehavior(leader);
1274 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1276 assertEquals(-1, appendEntries.getLeaderCommit());
1277 assertEquals(0, appendEntries.getEntries().size());
1278 assertEquals(0, appendEntries.getPrevLogIndex());
1280 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1281 leaderActor, AppendEntriesReply.class);
1283 assertEquals(2, appendEntriesReply.getLogLastIndex());
1284 assertEquals(1, appendEntriesReply.getLogLastTerm());
1286 // follower returns its next index
1287 assertEquals(2, appendEntriesReply.getLogLastIndex());
1288 assertEquals(1, appendEntriesReply.getLogLastTerm());
1294 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1295 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1297 final MockRaftActorContext leaderActorContext = createActorContext();
1299 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1300 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1302 Follower follower = new Follower(followerActorContext);
1303 followerActor.underlyingActor().setBehavior(follower);
1304 followerActorContext.setCurrentBehavior(follower);
1306 Map<String, String> leaderPeerAddresses = new HashMap<>();
1307 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1309 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1311 leaderActorContext.getReplicatedLog().removeFrom(0);
1313 leaderActorContext.setReplicatedLog(
1314 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316 leaderActorContext.setCommitIndex(1);
1318 followerActorContext.getReplicatedLog().removeFrom(0);
1320 followerActorContext.setReplicatedLog(
1321 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1323 // follower has the same log entries but its commit index > leaders commit index
1324 followerActorContext.setCommitIndex(2);
1326 leader = new Leader(leaderActorContext);
1328 // Initial heartbeat
1329 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1331 assertEquals(-1, appendEntries.getLeaderCommit());
1332 assertEquals(0, appendEntries.getEntries().size());
1333 assertEquals(0, appendEntries.getPrevLogIndex());
1335 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1336 leaderActor, AppendEntriesReply.class);
1338 assertEquals(2, appendEntriesReply.getLogLastIndex());
1339 assertEquals(1, appendEntriesReply.getLogLastTerm());
1341 leaderActor.underlyingActor().setBehavior(follower);
1342 leader.handleMessage(followerActor, appendEntriesReply);
1344 leaderActor.underlyingActor().clear();
1345 followerActor.underlyingActor().clear();
1347 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1348 TimeUnit.MILLISECONDS);
1350 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1352 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1354 assertEquals(2, appendEntries.getLeaderCommit());
1355 assertEquals(0, appendEntries.getEntries().size());
1356 assertEquals(2, appendEntries.getPrevLogIndex());
1358 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1360 assertEquals(2, appendEntriesReply.getLogLastIndex());
1361 assertEquals(1, appendEntriesReply.getLogLastTerm());
1363 assertEquals(2, followerActorContext.getCommitIndex());
1369 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1370 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1372 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1373 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1374 new FiniteDuration(1000, TimeUnit.SECONDS));
1376 leaderActorContext.setReplicatedLog(
1377 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1378 long leaderCommitIndex = 2;
1379 leaderActorContext.setCommitIndex(leaderCommitIndex);
1380 leaderActorContext.setLastApplied(leaderCommitIndex);
1382 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1383 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1385 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1387 followerActorContext.setReplicatedLog(
1388 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1389 followerActorContext.setCommitIndex(0);
1390 followerActorContext.setLastApplied(0);
1392 Follower follower = new Follower(followerActorContext);
1393 followerActor.underlyingActor().setBehavior(follower);
1395 leader = new Leader(leaderActorContext);
1397 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1398 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1399 AppendEntriesReply.class);
1401 MessageCollectorActor.clearMessages(followerActor);
1402 MessageCollectorActor.clearMessages(leaderActor);
1404 // Verify initial AppendEntries sent.
1405 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1406 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1407 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1409 leaderActor.underlyingActor().setBehavior(leader);
1411 leader.handleMessage(followerActor, appendEntriesReply);
1413 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1414 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1416 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1417 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1418 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1420 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1421 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1422 appendEntries.getEntries().get(0).getData());
1423 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1424 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1425 appendEntries.getEntries().get(1).getData());
1427 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1428 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1430 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1432 ApplyState applyState = applyStateList.get(0);
1433 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1434 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1435 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1436 applyState.getReplicatedLogEntry().getData());
1438 applyState = applyStateList.get(1);
1439 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1440 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1441 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1442 applyState.getReplicatedLogEntry().getData());
1444 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1445 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1449 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1450 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1452 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1453 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1454 new FiniteDuration(1000, TimeUnit.SECONDS));
1456 leaderActorContext.setReplicatedLog(
1457 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1458 long leaderCommitIndex = 1;
1459 leaderActorContext.setCommitIndex(leaderCommitIndex);
1460 leaderActorContext.setLastApplied(leaderCommitIndex);
1462 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1463 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1465 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1467 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1468 followerActorContext.setCommitIndex(-1);
1469 followerActorContext.setLastApplied(-1);
1471 Follower follower = new Follower(followerActorContext);
1472 followerActor.underlyingActor().setBehavior(follower);
1473 followerActorContext.setCurrentBehavior(follower);
1475 leader = new Leader(leaderActorContext);
1477 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1478 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1479 AppendEntriesReply.class);
1481 MessageCollectorActor.clearMessages(followerActor);
1482 MessageCollectorActor.clearMessages(leaderActor);
1484 // Verify initial AppendEntries sent with the leader's current commit index.
1485 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1486 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1487 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1489 leaderActor.underlyingActor().setBehavior(leader);
1490 leaderActorContext.setCurrentBehavior(leader);
1492 leader.handleMessage(followerActor, appendEntriesReply);
1494 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1495 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1497 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1498 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1499 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1501 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1502 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1503 appendEntries.getEntries().get(0).getData());
1504 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1505 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1506 appendEntries.getEntries().get(1).getData());
1508 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1509 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1511 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1513 ApplyState applyState = applyStateList.get(0);
1514 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1515 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1516 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1517 applyState.getReplicatedLogEntry().getData());
1519 applyState = applyStateList.get(1);
1520 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1521 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1522 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1523 applyState.getReplicatedLogEntry().getData());
1525 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1526 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1530 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1531 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1533 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1534 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1535 new FiniteDuration(1000, TimeUnit.SECONDS));
1537 leaderActorContext.setReplicatedLog(
1538 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1539 long leaderCommitIndex = 1;
1540 leaderActorContext.setCommitIndex(leaderCommitIndex);
1541 leaderActorContext.setLastApplied(leaderCommitIndex);
1543 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1544 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1546 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1548 followerActorContext.setReplicatedLog(
1549 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1550 followerActorContext.setCommitIndex(-1);
1551 followerActorContext.setLastApplied(-1);
1553 Follower follower = new Follower(followerActorContext);
1554 followerActor.underlyingActor().setBehavior(follower);
1555 followerActorContext.setCurrentBehavior(follower);
1557 leader = new Leader(leaderActorContext);
1559 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1560 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1561 AppendEntriesReply.class);
1563 MessageCollectorActor.clearMessages(followerActor);
1564 MessageCollectorActor.clearMessages(leaderActor);
1566 // Verify initial AppendEntries sent with the leader's current commit index.
1567 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1568 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1569 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1571 leaderActor.underlyingActor().setBehavior(leader);
1572 leaderActorContext.setCurrentBehavior(leader);
1574 leader.handleMessage(followerActor, appendEntriesReply);
1576 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1577 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1579 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1580 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1581 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1583 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1584 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1585 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1586 appendEntries.getEntries().get(0).getData());
1587 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1588 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1589 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1590 appendEntries.getEntries().get(1).getData());
1592 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1593 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1595 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1597 ApplyState applyState = applyStateList.get(0);
1598 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1599 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1601 applyState.getReplicatedLogEntry().getData());
1603 applyState = applyStateList.get(1);
1604 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1605 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1606 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1607 applyState.getReplicatedLogEntry().getData());
1609 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1610 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1611 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1615 public void testHandleAppendEntriesReplyWithNewerTerm() {
1616 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1618 MockRaftActorContext leaderActorContext = createActorContext();
1619 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1620 new FiniteDuration(10000, TimeUnit.SECONDS));
1622 leaderActorContext.setReplicatedLog(
1623 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1625 leader = new Leader(leaderActorContext);
1626 leaderActor.underlyingActor().setBehavior(leader);
1627 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1629 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1630 AppendEntriesReply.class);
1632 assertEquals(false, appendEntriesReply.isSuccess());
1633 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1635 MessageCollectorActor.clearMessages(leaderActor);
1639 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1640 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1642 MockRaftActorContext leaderActorContext = createActorContext();
1643 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1644 new FiniteDuration(10000, TimeUnit.SECONDS));
1646 leaderActorContext.setReplicatedLog(
1647 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1648 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1650 leader = new Leader(leaderActorContext);
1651 leaderActor.underlyingActor().setBehavior(leader);
1652 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1654 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1655 AppendEntriesReply.class);
1657 assertEquals(false, appendEntriesReply.isSuccess());
1658 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1660 MessageCollectorActor.clearMessages(leaderActor);
1664 public void testHandleAppendEntriesReplySuccess() throws Exception {
1665 logStart("testHandleAppendEntriesReplySuccess");
1667 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1669 leaderActorContext.setReplicatedLog(
1670 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1672 leaderActorContext.setCommitIndex(1);
1673 leaderActorContext.setLastApplied(1);
1674 leaderActorContext.getTermInformation().update(1, "leader");
1676 leader = new Leader(leaderActorContext);
1678 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1680 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1681 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1683 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1685 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1687 assertEquals(RaftState.Leader, raftActorBehavior.state());
1689 assertEquals(2, leaderActorContext.getCommitIndex());
1691 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1692 leaderActor, ApplyJournalEntries.class);
1694 assertEquals(2, leaderActorContext.getLastApplied());
1696 assertEquals(2, applyJournalEntries.getToIndex());
1698 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1701 assertEquals(1,applyStateList.size());
1703 ApplyState applyState = applyStateList.get(0);
1705 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1707 assertEquals(2, followerInfo.getMatchIndex());
1708 assertEquals(3, followerInfo.getNextIndex());
1709 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1710 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1714 public void testHandleAppendEntriesReplyUnknownFollower() {
1715 logStart("testHandleAppendEntriesReplyUnknownFollower");
1717 MockRaftActorContext leaderActorContext = createActorContext();
1719 leader = new Leader(leaderActorContext);
1721 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1723 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1725 assertEquals(RaftState.Leader, raftActorBehavior.state());
1729 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1730 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1732 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1733 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1734 new FiniteDuration(1000, TimeUnit.SECONDS));
1735 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1737 leaderActorContext.setReplicatedLog(
1738 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1739 long leaderCommitIndex = 3;
1740 leaderActorContext.setCommitIndex(leaderCommitIndex);
1741 leaderActorContext.setLastApplied(leaderCommitIndex);
1743 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1744 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1745 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1746 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1748 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1750 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1751 followerActorContext.setCommitIndex(-1);
1752 followerActorContext.setLastApplied(-1);
1754 Follower follower = new Follower(followerActorContext);
1755 followerActor.underlyingActor().setBehavior(follower);
1756 followerActorContext.setCurrentBehavior(follower);
1758 leader = new Leader(leaderActorContext);
1760 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1761 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1762 AppendEntriesReply.class);
1764 MessageCollectorActor.clearMessages(followerActor);
1765 MessageCollectorActor.clearMessages(leaderActor);
1767 // Verify initial AppendEntries sent with the leader's current commit index.
1768 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1769 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1770 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1772 leaderActor.underlyingActor().setBehavior(leader);
1773 leaderActorContext.setCurrentBehavior(leader);
1775 leader.handleMessage(followerActor, appendEntriesReply);
1777 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1778 AppendEntries.class, 2);
1779 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1781 appendEntries = appendEntriesList.get(0);
1782 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1783 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1784 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1786 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1787 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1788 appendEntries.getEntries().get(0).getData());
1789 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1790 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1791 appendEntries.getEntries().get(1).getData());
1793 appendEntries = appendEntriesList.get(1);
1794 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1795 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1796 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1798 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1799 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1800 appendEntries.getEntries().get(0).getData());
1801 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1802 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1803 appendEntries.getEntries().get(1).getData());
1805 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1806 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1808 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1810 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1811 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1815 public void testHandleRequestVoteReply() {
1816 logStart("testHandleRequestVoteReply");
1818 MockRaftActorContext leaderActorContext = createActorContext();
1820 leader = new Leader(leaderActorContext);
1822 // Should be a no-op.
1823 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1824 new RequestVoteReply(1, true));
1826 assertEquals(RaftState.Leader, raftActorBehavior.state());
1828 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1830 assertEquals(RaftState.Leader, raftActorBehavior.state());
1834 public void testIsolatedLeaderCheckNoFollowers() {
1835 logStart("testIsolatedLeaderCheckNoFollowers");
1837 MockRaftActorContext leaderActorContext = createActorContext();
1839 leader = new Leader(leaderActorContext);
1840 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1841 assertTrue(newBehavior instanceof Leader);
1845 public void testIsolatedLeaderCheckNoVotingFollowers() {
1846 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1848 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1849 Follower follower = new Follower(followerActorContext);
1850 followerActor.underlyingActor().setBehavior(follower);
1852 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1853 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1854 new FiniteDuration(1000, TimeUnit.SECONDS));
1855 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1857 leader = new Leader(leaderActorContext);
1858 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1859 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1860 assertTrue("Expected Leader", newBehavior instanceof Leader);
1863 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1864 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1865 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1867 MockRaftActorContext leaderActorContext = createActorContext();
1869 Map<String, String> peerAddresses = new HashMap<>();
1870 peerAddresses.put("follower-1", followerActor1.path().toString());
1871 peerAddresses.put("follower-2", followerActor2.path().toString());
1873 leaderActorContext.setPeerAddresses(peerAddresses);
1874 leaderActorContext.setRaftPolicy(raftPolicy);
1876 leader = new Leader(leaderActorContext);
1878 leader.markFollowerActive("follower-1");
1879 leader.markFollowerActive("follower-2");
1880 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1881 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1883 // kill 1 follower and verify if that got killed
1884 final JavaTestKit probe = new JavaTestKit(getSystem());
1885 probe.watch(followerActor1);
1886 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1887 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1888 assertEquals(termMsg1.getActor(), followerActor1);
1890 leader.markFollowerInActive("follower-1");
1891 leader.markFollowerActive("follower-2");
1892 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1893 assertTrue("Behavior not instance of Leader when majority of followers are active",
1894 newBehavior instanceof Leader);
1896 // kill 2nd follower and leader should change to Isolated leader
1897 followerActor2.tell(PoisonPill.getInstance(), null);
1898 probe.watch(followerActor2);
1899 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1900 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1901 assertEquals(termMsg2.getActor(), followerActor2);
1903 leader.markFollowerInActive("follower-2");
1904 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1908 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1909 logStart("testIsolatedLeaderCheckTwoFollowers");
1911 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1913 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1914 newBehavior instanceof IsolatedLeader);
1918 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1919 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1921 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1923 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1924 newBehavior instanceof Leader);
1928 public void testLaggingFollowerStarvation() throws Exception {
1929 logStart("testLaggingFollowerStarvation");
1931 String leaderActorId = actorFactory.generateActorId("leader");
1932 String follower1ActorId = actorFactory.generateActorId("follower");
1933 String follower2ActorId = actorFactory.generateActorId("follower");
1935 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1936 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1938 MockRaftActorContext leaderActorContext =
1939 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1941 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1942 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1943 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1945 leaderActorContext.setConfigParams(configParams);
1947 leaderActorContext.setReplicatedLog(
1948 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1950 Map<String, String> peerAddresses = new HashMap<>();
1951 peerAddresses.put(follower1ActorId,
1952 follower1Actor.path().toString());
1953 peerAddresses.put(follower2ActorId,
1954 follower2Actor.path().toString());
1956 leaderActorContext.setPeerAddresses(peerAddresses);
1957 leaderActorContext.getTermInformation().update(1, leaderActorId);
1959 leader = createBehavior(leaderActorContext);
1961 leaderActor.underlyingActor().setBehavior(leader);
1963 for (int i = 1; i < 6; i++) {
1964 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1965 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1966 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1967 assertTrue(newBehavior == leader);
1968 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1971 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1972 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1974 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1975 heartbeats.size() > 1);
1977 // Check if follower-2 got AppendEntries during this time and was not starved
1978 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1980 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1981 appendEntries.size() > 1);
1985 public void testReplicationConsensusWithNonVotingFollower() {
1986 logStart("testReplicationConsensusWithNonVotingFollower");
1988 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1989 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1990 new FiniteDuration(1000, TimeUnit.SECONDS));
1992 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1993 leaderActorContext.setCommitIndex(-1);
1994 leaderActorContext.setLastApplied(-1);
1996 String nonVotingFollowerId = "nonvoting-follower";
1997 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1998 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2000 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2001 VotingState.NON_VOTING);
2003 leader = new Leader(leaderActorContext);
2004 leaderActorContext.setCurrentBehavior(leader);
2006 // Ignore initial heartbeats
2007 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2008 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2010 MessageCollectorActor.clearMessages(followerActor);
2011 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2012 MessageCollectorActor.clearMessages(leaderActor);
2014 // Send a Replicate message and wait for AppendEntries.
2015 sendReplicate(leaderActorContext, 0);
2017 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2018 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2020 // Send reply only from the voting follower and verify consensus via ApplyState.
2021 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2023 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2025 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2027 MessageCollectorActor.clearMessages(followerActor);
2028 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2029 MessageCollectorActor.clearMessages(leaderActor);
2031 // Send another Replicate message
2032 sendReplicate(leaderActorContext, 1);
2034 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2035 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2036 AppendEntries.class);
2037 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2038 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2040 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2041 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2043 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2045 // Send reply from the voting follower and verify consensus.
2046 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2048 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2052 public void testTransferLeadershipWithFollowerInSync() {
2053 logStart("testTransferLeadershipWithFollowerInSync");
2055 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2056 leaderActorContext.setLastApplied(-1);
2057 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2058 new FiniteDuration(1000, TimeUnit.SECONDS));
2059 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2061 leader = new Leader(leaderActorContext);
2062 leaderActorContext.setCurrentBehavior(leader);
2064 // Initial heartbeat
2065 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2067 MessageCollectorActor.clearMessages(followerActor);
2069 sendReplicate(leaderActorContext, 0);
2070 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2072 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2073 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2074 MessageCollectorActor.clearMessages(followerActor);
2076 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2077 leader.transferLeadership(mockTransferCohort);
2079 verify(mockTransferCohort, never()).transferComplete();
2080 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2081 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2083 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2084 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2086 // Leader should force an election timeout
2087 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2089 verify(mockTransferCohort).transferComplete();
2093 public void testTransferLeadershipWithEmptyLog() {
2094 logStart("testTransferLeadershipWithEmptyLog");
2096 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2097 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2098 new FiniteDuration(1000, TimeUnit.SECONDS));
2099 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2101 leader = new Leader(leaderActorContext);
2102 leaderActorContext.setCurrentBehavior(leader);
2104 // Initial heartbeat
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107 MessageCollectorActor.clearMessages(followerActor);
2109 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2110 leader.transferLeadership(mockTransferCohort);
2112 verify(mockTransferCohort, never()).transferComplete();
2113 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2114 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2116 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2117 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2119 // Leader should force an election timeout
2120 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2122 verify(mockTransferCohort).transferComplete();
2126 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2127 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2129 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2130 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2131 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2133 leader = new Leader(leaderActorContext);
2134 leaderActorContext.setCurrentBehavior(leader);
2136 // Initial heartbeat
2137 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2138 MessageCollectorActor.clearMessages(followerActor);
2140 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2141 leader.transferLeadership(mockTransferCohort);
2143 verify(mockTransferCohort, never()).transferComplete();
2145 // Sync up the follower.
2146 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2148 MessageCollectorActor.clearMessages(followerActor);
2150 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2151 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2152 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2153 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2156 // Leader should force an election timeout
2157 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2159 verify(mockTransferCohort).transferComplete();
2163 public void testTransferLeadershipWithFollowerSyncTimeout() {
2164 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2166 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2167 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2168 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2169 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2170 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2172 leader = new Leader(leaderActorContext);
2173 leaderActorContext.setCurrentBehavior(leader);
2175 // Initial heartbeat
2176 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2177 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2178 MessageCollectorActor.clearMessages(followerActor);
2180 sendReplicate(leaderActorContext, 0);
2181 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2183 MessageCollectorActor.clearMessages(followerActor);
2185 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2186 leader.transferLeadership(mockTransferCohort);
2188 verify(mockTransferCohort, never()).transferComplete();
2190 // Send heartbeats to time out the transfer.
2191 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2192 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2193 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2194 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2197 verify(mockTransferCohort).abortTransfer();
2198 verify(mockTransferCohort, never()).transferComplete();
2199 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2203 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2204 ActorRef actorRef, RaftRPC rpc) throws Exception {
2205 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2206 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2209 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2211 private final long electionTimeOutIntervalMillis;
2212 private final int snapshotChunkSize;
2214 MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2216 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2217 this.snapshotChunkSize = snapshotChunkSize;
2221 public FiniteDuration getElectionTimeOutInterval() {
2222 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2226 public int getSnapshotChunkSize() {
2227 return snapshotChunkSize;