2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
43 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
44 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
45 import org.opendaylight.controller.cluster.raft.RaftActorContext;
46 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
47 import org.opendaylight.controller.cluster.raft.RaftState;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
55 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
56 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
60 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
61 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
70 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
71 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.opendaylight.yangtools.concepts.Identifier;
74 import scala.concurrent.duration.FiniteDuration;
76 public class LeaderTest extends AbstractLeaderTest<Leader> {
78 static final String FOLLOWER_ID = "follower";
79 public static final String LEADER_ID = "leader";
81 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
82 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
84 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
85 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
87 private Leader leader;
88 private final short payloadVersion = 5;
92 public void tearDown() throws Exception {
101 public void testHandleMessageForUnknownMessage() throws Exception {
102 logStart("testHandleMessageForUnknownMessage");
104 leader = new Leader(createActorContext());
106 // handle message should null when it receives an unknown message
107 assertNull(leader.handleMessage(followerActor, "foo"));
111 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
112 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
114 MockRaftActorContext actorContext = createActorContextWithFollower();
115 actorContext.setCommitIndex(-1);
116 actorContext.setPayloadVersion(payloadVersion);
119 actorContext.getTermInformation().update(term, "");
121 leader = new Leader(actorContext);
122 actorContext.setCurrentBehavior(leader);
124 // Leader should send an immediate heartbeat with no entries as follower is inactive.
125 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
126 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
127 assertEquals("getTerm", term, appendEntries.getTerm());
128 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
129 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
130 assertEquals("Entries size", 0, appendEntries.getEntries().size());
131 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
133 // The follower would normally reply - simulate that explicitly here.
134 leader.handleMessage(followerActor, new AppendEntriesReply(
135 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
136 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
138 followerActor.underlyingActor().clear();
140 // Sleep for the heartbeat interval so AppendEntries is sent.
141 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
142 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
144 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
146 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
147 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
148 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
149 assertEquals("Entries size", 1, appendEntries.getEntries().size());
150 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
151 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
152 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
156 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
157 return sendReplicate(actorContext, 1, index);
160 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
161 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
162 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
163 actorContext.getReplicatedLog().append(newEntry);
164 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
168 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
169 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
171 MockRaftActorContext actorContext = createActorContextWithFollower();
174 actorContext.getTermInformation().update(term, "");
176 leader = new Leader(actorContext);
178 // Leader will send an immediate heartbeat - ignore it.
179 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
181 // The follower would normally reply - simulate that explicitly here.
182 long lastIndex = actorContext.getReplicatedLog().lastIndex();
183 leader.handleMessage(followerActor, new AppendEntriesReply(
184 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
185 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
187 followerActor.underlyingActor().clear();
189 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
191 // State should not change
192 assertTrue(raftBehavior instanceof Leader);
194 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
195 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
196 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
197 assertEquals("Entries size", 1, appendEntries.getEntries().size());
198 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
199 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
200 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
201 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
205 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
206 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
208 MockRaftActorContext actorContext = createActorContextWithFollower();
209 actorContext.setCommitIndex(-1);
210 actorContext.setLastApplied(-1);
212 // The raft context is initialized with a couple log entries. However the commitIndex
213 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
214 // committed and applied. Now it regains leadership with a higher term (2).
215 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
216 long newTerm = prevTerm + 1;
217 actorContext.getTermInformation().update(newTerm, "");
219 leader = new Leader(actorContext);
220 actorContext.setCurrentBehavior(leader);
222 // Leader will send an immediate heartbeat - ignore it.
223 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
225 // The follower replies with the leader's current last index and term, simulating that it is
226 // up to date with the leader.
227 long lastIndex = actorContext.getReplicatedLog().lastIndex();
228 leader.handleMessage(followerActor, new AppendEntriesReply(
229 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
231 // The commit index should not get updated even though consensus was reached. This is b/c the
232 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
233 // from previous terms by counting replicas".
234 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
236 followerActor.underlyingActor().clear();
238 // Now replicate a new entry with the new term 2.
239 long newIndex = lastIndex + 1;
240 sendReplicate(actorContext, newTerm, newIndex);
242 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
243 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
244 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
245 assertEquals("Entries size", 1, appendEntries.getEntries().size());
246 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
247 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
248 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
250 // The follower replies with success. The leader should now update the commit index to the new index
251 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
252 // prior entries are committed indirectly".
253 leader.handleMessage(followerActor, new AppendEntriesReply(
254 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
256 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
260 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
261 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
263 MockRaftActorContext actorContext = createActorContextWithFollower();
264 actorContext.setRaftPolicy(createRaftPolicy(true, true));
267 actorContext.getTermInformation().update(term, "");
269 leader = new Leader(actorContext);
271 // Leader will send an immediate heartbeat - ignore it.
272 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
274 // The follower would normally reply - simulate that explicitly here.
275 long lastIndex = actorContext.getReplicatedLog().lastIndex();
276 leader.handleMessage(followerActor, new AppendEntriesReply(
277 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
278 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
280 followerActor.underlyingActor().clear();
282 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
284 // State should not change
285 assertTrue(raftBehavior instanceof Leader);
287 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
288 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
289 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
290 assertEquals("Entries size", 1, appendEntries.getEntries().size());
291 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
292 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
293 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
294 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
298 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
299 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
301 MockRaftActorContext actorContext = createActorContextWithFollower();
302 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
304 public FiniteDuration getHeartBeatInterval() {
305 return FiniteDuration.apply(5, TimeUnit.SECONDS);
310 actorContext.getTermInformation().update(term, "");
312 leader = new Leader(actorContext);
314 // Leader will send an immediate heartbeat - ignore it.
315 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
317 // The follower would normally reply - simulate that explicitly here.
318 long lastIndex = actorContext.getReplicatedLog().lastIndex();
319 leader.handleMessage(followerActor, new AppendEntriesReply(
320 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
321 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
323 followerActor.underlyingActor().clear();
325 for (int i = 0; i < 5; i++) {
326 sendReplicate(actorContext, lastIndex + i + 1);
329 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
330 // We expect only 1 message to be sent because of two reasons,
331 // - an append entries reply was not received
332 // - the heartbeat interval has not expired
333 // In this scenario if multiple messages are sent they would likely be duplicates
334 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
338 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
339 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
341 MockRaftActorContext actorContext = createActorContextWithFollower();
342 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
344 public FiniteDuration getHeartBeatInterval() {
345 return FiniteDuration.apply(5, TimeUnit.SECONDS);
350 actorContext.getTermInformation().update(term, "");
352 leader = new Leader(actorContext);
354 // Leader will send an immediate heartbeat - ignore it.
355 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
357 // The follower would normally reply - simulate that explicitly here.
358 long lastIndex = actorContext.getReplicatedLog().lastIndex();
359 leader.handleMessage(followerActor, new AppendEntriesReply(
360 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
361 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
363 followerActor.underlyingActor().clear();
365 for (int i = 0; i < 3; i++) {
366 sendReplicate(actorContext, lastIndex + i + 1);
367 leader.handleMessage(followerActor, new AppendEntriesReply(
368 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
372 for (int i = 3; i < 5; i++) {
373 sendReplicate(actorContext, lastIndex + i + 1);
376 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
378 // get sent to the follower - but not the 5th
379 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
381 for (int i = 0; i < 4; i++) {
382 long expected = allMessages.get(i).getEntries().get(0).getIndex();
383 assertEquals(expected, i + 2);
388 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
389 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
391 MockRaftActorContext actorContext = createActorContextWithFollower();
392 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
394 public FiniteDuration getHeartBeatInterval() {
395 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
400 actorContext.getTermInformation().update(term, "");
402 leader = new Leader(actorContext);
404 // Leader will send an immediate heartbeat - ignore it.
405 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
407 // The follower would normally reply - simulate that explicitly here.
408 long lastIndex = actorContext.getReplicatedLog().lastIndex();
409 leader.handleMessage(followerActor, new AppendEntriesReply(
410 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
411 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
413 followerActor.underlyingActor().clear();
415 sendReplicate(actorContext, lastIndex + 1);
417 // Wait slightly longer than heartbeat duration
418 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
420 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
422 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
423 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
425 assertEquals(1, allMessages.get(0).getEntries().size());
426 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
427 assertEquals(1, allMessages.get(1).getEntries().size());
428 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
433 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
434 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
436 MockRaftActorContext actorContext = createActorContextWithFollower();
437 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
439 public FiniteDuration getHeartBeatInterval() {
440 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
445 actorContext.getTermInformation().update(term, "");
447 leader = new Leader(actorContext);
449 // Leader will send an immediate heartbeat - ignore it.
450 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
452 // The follower would normally reply - simulate that explicitly here.
453 long lastIndex = actorContext.getReplicatedLog().lastIndex();
454 leader.handleMessage(followerActor, new AppendEntriesReply(
455 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
456 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
458 followerActor.underlyingActor().clear();
460 for (int i = 0; i < 3; i++) {
461 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
462 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
465 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
466 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
470 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
471 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
473 MockRaftActorContext actorContext = createActorContextWithFollower();
474 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
476 public FiniteDuration getHeartBeatInterval() {
477 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
482 actorContext.getTermInformation().update(term, "");
484 leader = new Leader(actorContext);
486 // Leader will send an immediate heartbeat - ignore it.
487 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
489 // The follower would normally reply - simulate that explicitly here.
490 long lastIndex = actorContext.getReplicatedLog().lastIndex();
491 leader.handleMessage(followerActor, new AppendEntriesReply(
492 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
493 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
495 followerActor.underlyingActor().clear();
497 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
498 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
499 sendReplicate(actorContext, lastIndex + 1);
501 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
502 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
504 assertEquals(0, allMessages.get(0).getEntries().size());
505 assertEquals(1, allMessages.get(1).getEntries().size());
510 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
511 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
513 MockRaftActorContext actorContext = createActorContext();
515 leader = new Leader(actorContext);
517 actorContext.setLastApplied(0);
519 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
520 long term = actorContext.getTermInformation().getCurrentTerm();
521 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
522 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
524 actorContext.getReplicatedLog().append(newEntry);
526 final Identifier id = new MockIdentifier("state-id");
527 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
528 new Replicate(leaderActor, id, newEntry, true));
530 // State should not change
531 assertTrue(raftBehavior instanceof Leader);
533 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
535 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
536 // one since lastApplied state is 0.
537 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
538 leaderActor, ApplyState.class);
539 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
541 for (int i = 0; i <= newLogIndex - 1; i++) {
542 ApplyState applyState = applyStateList.get(i);
543 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
544 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
547 ApplyState last = applyStateList.get((int) newLogIndex - 1);
548 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
549 assertEquals("getIdentifier", id, last.getIdentifier());
553 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
554 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
556 final MockRaftActorContext actorContext = createActorContextWithFollower();
558 Map<String, String> leadersSnapshot = new HashMap<>();
559 leadersSnapshot.put("1", "A");
560 leadersSnapshot.put("2", "B");
561 leadersSnapshot.put("3", "C");
564 actorContext.getReplicatedLog().removeFrom(0);
566 final int commitIndex = 3;
567 final int snapshotIndex = 2;
568 final int snapshotTerm = 1;
570 // set the snapshot variables in replicatedlog
571 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
572 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
573 actorContext.setCommitIndex(commitIndex);
574 //set follower timeout to 2 mins, helps during debugging
575 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
577 leader = new Leader(actorContext);
579 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
580 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
582 //update follower timestamp
583 leader.markFollowerActive(FOLLOWER_ID);
585 ByteString bs = toByteString(leadersSnapshot);
586 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
587 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
588 -1, null, null), ByteSource.wrap(bs.toByteArray())));
589 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
590 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
591 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
592 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
594 //send first chunk and no InstallSnapshotReply received yet
596 fts.incrementChunkIndex();
598 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
599 TimeUnit.MILLISECONDS);
601 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
603 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
605 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
607 //InstallSnapshotReply received
608 fts.markSendStatus(true);
610 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
612 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
614 assertEquals(commitIndex, is.getLastIncludedIndex());
618 public void testSendAppendEntriesSnapshotScenario() throws Exception {
619 logStart("testSendAppendEntriesSnapshotScenario");
621 final MockRaftActorContext actorContext = createActorContextWithFollower();
623 Map<String, String> leadersSnapshot = new HashMap<>();
624 leadersSnapshot.put("1", "A");
625 leadersSnapshot.put("2", "B");
626 leadersSnapshot.put("3", "C");
629 actorContext.getReplicatedLog().removeFrom(0);
631 final int followersLastIndex = 2;
632 final int snapshotIndex = 3;
633 final int newEntryIndex = 4;
634 final int snapshotTerm = 1;
635 final int currentTerm = 2;
637 // set the snapshot variables in replicatedlog
638 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640 actorContext.setCommitIndex(followersLastIndex);
642 leader = new Leader(actorContext);
644 // Leader will send an immediate heartbeat - ignore it.
645 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
648 SimpleReplicatedLogEntry entry =
649 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
650 new MockRaftActorContext.MockPayload("D"));
652 actorContext.getReplicatedLog().append(entry);
654 //update follower timestamp
655 leader.markFollowerActive(FOLLOWER_ID);
657 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
658 RaftActorBehavior raftBehavior = leader.handleMessage(
659 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
661 assertTrue(raftBehavior instanceof Leader);
663 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
667 public void testInitiateInstallSnapshot() throws Exception {
668 logStart("testInitiateInstallSnapshot");
670 MockRaftActorContext actorContext = createActorContextWithFollower();
673 actorContext.getReplicatedLog().removeFrom(0);
675 final int followersLastIndex = 2;
676 final int snapshotIndex = 3;
677 final int newEntryIndex = 4;
678 final int snapshotTerm = 1;
679 final int currentTerm = 2;
681 // set the snapshot variables in replicatedlog
682 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
683 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
684 actorContext.setLastApplied(3);
685 actorContext.setCommitIndex(followersLastIndex);
687 leader = new Leader(actorContext);
689 // Leader will send an immediate heartbeat - ignore it.
690 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
692 // set the snapshot as absent and check if capture-snapshot is invoked.
693 leader.setSnapshot(null);
696 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
697 new MockRaftActorContext.MockPayload("D"));
699 actorContext.getReplicatedLog().append(entry);
701 //update follower timestamp
702 leader.markFollowerActive(FOLLOWER_ID);
704 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
706 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
708 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
710 assertEquals(3, cs.getLastAppliedIndex());
711 assertEquals(1, cs.getLastAppliedTerm());
712 assertEquals(4, cs.getLastIndex());
713 assertEquals(2, cs.getLastTerm());
715 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
716 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
718 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
722 public void testInitiateForceInstallSnapshot() throws Exception {
723 logStart("testInitiateForceInstallSnapshot");
725 MockRaftActorContext actorContext = createActorContextWithFollower();
727 final int followersLastIndex = 2;
728 final int snapshotIndex = -1;
729 final int newEntryIndex = 4;
730 final int snapshotTerm = -1;
731 final int currentTerm = 2;
733 // set the snapshot variables in replicatedlog
734 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
735 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
736 actorContext.setLastApplied(3);
737 actorContext.setCommitIndex(followersLastIndex);
739 actorContext.getReplicatedLog().removeFrom(0);
741 leader = new Leader(actorContext);
742 actorContext.setCurrentBehavior(leader);
744 // Leader will send an immediate heartbeat - ignore it.
745 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
747 // set the snapshot as absent and check if capture-snapshot is invoked.
748 leader.setSnapshot(null);
750 for (int i = 0; i < 4; i++) {
751 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
752 new MockRaftActorContext.MockPayload("X" + i)));
756 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
757 new MockRaftActorContext.MockPayload("D"));
759 actorContext.getReplicatedLog().append(entry);
761 //update follower timestamp
762 leader.markFollowerActive(FOLLOWER_ID);
764 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
765 // installed with a SendInstallSnapshot
766 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
768 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
770 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
772 assertEquals(3, cs.getLastAppliedIndex());
773 assertEquals(1, cs.getLastAppliedTerm());
774 assertEquals(4, cs.getLastIndex());
775 assertEquals(2, cs.getLastTerm());
777 // if an initiate is started again when first is in progress, it should not initiate Capture
778 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
780 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
785 public void testInstallSnapshot() throws Exception {
786 logStart("testInstallSnapshot");
788 final MockRaftActorContext actorContext = createActorContextWithFollower();
790 Map<String, String> leadersSnapshot = new HashMap<>();
791 leadersSnapshot.put("1", "A");
792 leadersSnapshot.put("2", "B");
793 leadersSnapshot.put("3", "C");
796 actorContext.getReplicatedLog().removeFrom(0);
798 final int lastAppliedIndex = 3;
799 final int snapshotIndex = 2;
800 final int snapshotTerm = 1;
801 final int currentTerm = 2;
803 // set the snapshot variables in replicatedlog
804 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
807 actorContext.setCommitIndex(lastAppliedIndex);
808 actorContext.setLastApplied(lastAppliedIndex);
810 leader = new Leader(actorContext);
812 // Initial heartbeat.
813 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
815 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
816 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
818 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
819 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
820 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
822 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
823 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
825 assertTrue(raftBehavior instanceof Leader);
827 // check if installsnapshot gets called with the correct values.
829 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
830 InstallSnapshot.class);
832 assertNotNull(installSnapshot.getData());
833 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
834 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
836 assertEquals(currentTerm, installSnapshot.getTerm());
840 public void testForceInstallSnapshot() throws Exception {
841 logStart("testForceInstallSnapshot");
843 final MockRaftActorContext actorContext = createActorContextWithFollower();
845 Map<String, String> leadersSnapshot = new HashMap<>();
846 leadersSnapshot.put("1", "A");
847 leadersSnapshot.put("2", "B");
848 leadersSnapshot.put("3", "C");
850 final int lastAppliedIndex = 3;
851 final int snapshotIndex = -1;
852 final int snapshotTerm = -1;
853 final int currentTerm = 2;
855 // set the snapshot variables in replicatedlog
856 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
857 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
858 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
859 actorContext.setCommitIndex(lastAppliedIndex);
860 actorContext.setLastApplied(lastAppliedIndex);
862 leader = new Leader(actorContext);
864 // Initial heartbeat.
865 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
867 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
868 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
870 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
871 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
872 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
874 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
875 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
877 assertTrue(raftBehavior instanceof Leader);
879 // check if installsnapshot gets called with the correct values.
881 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
882 InstallSnapshot.class);
884 assertNotNull(installSnapshot.getData());
885 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
886 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
888 assertEquals(currentTerm, installSnapshot.getTerm());
892 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
893 logStart("testHandleInstallSnapshotReplyLastChunk");
895 MockRaftActorContext actorContext = createActorContextWithFollower();
897 final int commitIndex = 3;
898 final int snapshotIndex = 2;
899 final int snapshotTerm = 1;
900 final int currentTerm = 2;
902 actorContext.setCommitIndex(commitIndex);
904 leader = new Leader(actorContext);
905 actorContext.setCurrentBehavior(leader);
907 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
908 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
910 // Ignore initial heartbeat.
911 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
913 Map<String, String> leadersSnapshot = new HashMap<>();
914 leadersSnapshot.put("1", "A");
915 leadersSnapshot.put("2", "B");
916 leadersSnapshot.put("3", "C");
918 // set the snapshot variables in replicatedlog
920 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
921 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
922 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
924 ByteString bs = toByteString(leadersSnapshot);
925 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
926 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
927 -1, null, null), ByteSource.wrap(bs.toByteArray())));
928 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
929 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
930 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
931 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
932 while (!fts.isLastChunk(fts.getChunkIndex())) {
934 fts.incrementChunkIndex();
938 actorContext.getReplicatedLog().removeFrom(0);
940 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
941 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
943 assertTrue(raftBehavior instanceof Leader);
945 assertEquals(1, leader.followerLogSize());
946 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
948 assertNull(fli.getInstallSnapshotState());
949 assertEquals(commitIndex, fli.getMatchIndex());
950 assertEquals(commitIndex + 1, fli.getNextIndex());
951 assertFalse(leader.hasSnapshot());
955 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
956 logStart("testSendSnapshotfromInstallSnapshotReply");
958 MockRaftActorContext actorContext = createActorContextWithFollower();
960 final int commitIndex = 3;
961 final int snapshotIndex = 2;
962 final int snapshotTerm = 1;
963 final int currentTerm = 2;
965 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
967 public int getSnapshotChunkSize() {
971 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
972 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
974 actorContext.setConfigParams(configParams);
975 actorContext.setCommitIndex(commitIndex);
977 leader = new Leader(actorContext);
978 actorContext.setCurrentBehavior(leader);
980 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
981 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
983 Map<String, String> leadersSnapshot = new HashMap<>();
984 leadersSnapshot.put("1", "A");
985 leadersSnapshot.put("2", "B");
986 leadersSnapshot.put("3", "C");
988 // set the snapshot variables in replicatedlog
989 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
990 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
991 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
993 ByteString bs = toByteString(leadersSnapshot);
994 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
995 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
998 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1000 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1001 InstallSnapshot.class);
1003 assertEquals(1, installSnapshot.getChunkIndex());
1004 assertEquals(3, installSnapshot.getTotalChunks());
1006 followerActor.underlyingActor().clear();
1007 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1008 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1010 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1012 assertEquals(2, installSnapshot.getChunkIndex());
1013 assertEquals(3, installSnapshot.getTotalChunks());
1015 followerActor.underlyingActor().clear();
1016 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1017 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1019 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1021 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1022 followerActor.underlyingActor().clear();
1023 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1024 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1026 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1028 assertNull(installSnapshot);
1033 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1034 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1036 MockRaftActorContext actorContext = createActorContextWithFollower();
1038 final int commitIndex = 3;
1039 final int snapshotIndex = 2;
1040 final int snapshotTerm = 1;
1041 final int currentTerm = 2;
1043 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1045 public int getSnapshotChunkSize() {
1050 actorContext.setCommitIndex(commitIndex);
1052 leader = new Leader(actorContext);
1054 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1055 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1057 Map<String, String> leadersSnapshot = new HashMap<>();
1058 leadersSnapshot.put("1", "A");
1059 leadersSnapshot.put("2", "B");
1060 leadersSnapshot.put("3", "C");
1062 // set the snapshot variables in replicatedlog
1063 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1064 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1065 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1067 ByteString bs = toByteString(leadersSnapshot);
1068 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1069 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1072 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1073 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1075 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1076 InstallSnapshot.class);
1078 assertEquals(1, installSnapshot.getChunkIndex());
1079 assertEquals(3, installSnapshot.getTotalChunks());
1081 followerActor.underlyingActor().clear();
1083 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1084 FOLLOWER_ID, -1, false));
1086 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1087 TimeUnit.MILLISECONDS);
1089 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1091 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1093 assertEquals(1, installSnapshot.getChunkIndex());
1094 assertEquals(3, installSnapshot.getTotalChunks());
1098 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1099 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1101 MockRaftActorContext actorContext = createActorContextWithFollower();
1103 final int commitIndex = 3;
1104 final int snapshotIndex = 2;
1105 final int snapshotTerm = 1;
1106 final int currentTerm = 2;
1108 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1110 public int getSnapshotChunkSize() {
1115 actorContext.setCommitIndex(commitIndex);
1117 leader = new Leader(actorContext);
1119 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1120 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1122 Map<String, String> leadersSnapshot = new HashMap<>();
1123 leadersSnapshot.put("1", "A");
1124 leadersSnapshot.put("2", "B");
1125 leadersSnapshot.put("3", "C");
1127 // set the snapshot variables in replicatedlog
1128 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1129 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1130 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1132 ByteString bs = toByteString(leadersSnapshot);
1133 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1134 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1137 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1139 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1140 InstallSnapshot.class);
1142 assertEquals(1, installSnapshot.getChunkIndex());
1143 assertEquals(3, installSnapshot.getTotalChunks());
1144 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1145 installSnapshot.getLastChunkHashCode().get().intValue());
1147 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1149 followerActor.underlyingActor().clear();
1151 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1152 FOLLOWER_ID, 1, true));
1154 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1156 assertEquals(2, installSnapshot.getChunkIndex());
1157 assertEquals(3, installSnapshot.getTotalChunks());
1158 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1162 public void testLeaderInstallSnapshotState() throws IOException {
1163 logStart("testLeaderInstallSnapshotState");
1165 Map<String, String> leadersSnapshot = new HashMap<>();
1166 leadersSnapshot.put("1", "A");
1167 leadersSnapshot.put("2", "B");
1168 leadersSnapshot.put("3", "C");
1170 ByteString bs = toByteString(leadersSnapshot);
1171 byte[] barray = bs.toByteArray();
1173 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1174 fts.setSnapshotBytes(ByteSource.wrap(barray));
1176 assertEquals(bs.size(), barray.length);
1179 for (int i = 0; i < barray.length; i = i + 50) {
1180 int length = i + 50;
1183 if (i + 50 > barray.length) {
1184 length = barray.length;
1187 byte[] chunk = fts.getNextChunk();
1188 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1189 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1191 fts.markSendStatus(true);
1192 if (!fts.isLastChunk(chunkIndex)) {
1193 fts.incrementChunkIndex();
1197 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1202 protected Leader createBehavior(final RaftActorContext actorContext) {
1203 return new Leader(actorContext);
1207 protected MockRaftActorContext createActorContext() {
1208 return createActorContext(leaderActor);
1212 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1213 return createActorContext(LEADER_ID, actorRef);
1216 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219 configParams.setElectionTimeoutFactor(100000);
1220 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221 context.setConfigParams(configParams);
1222 context.setPayloadVersion(payloadVersion);
1226 private MockRaftActorContext createActorContextWithFollower() {
1227 MockRaftActorContext actorContext = createActorContext();
1228 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1229 followerActor.path().toString()).build());
1230 return actorContext;
1233 private MockRaftActorContext createFollowerActorContextWithLeader() {
1234 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1235 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1236 followerConfig.setElectionTimeoutFactor(10000);
1237 followerActorContext.setConfigParams(followerConfig);
1238 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1239 return followerActorContext;
1243 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1244 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1246 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1248 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1250 Follower follower = new Follower(followerActorContext);
1251 followerActor.underlyingActor().setBehavior(follower);
1252 followerActorContext.setCurrentBehavior(follower);
1254 Map<String, String> peerAddresses = new HashMap<>();
1255 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1257 leaderActorContext.setPeerAddresses(peerAddresses);
1259 leaderActorContext.getReplicatedLog().removeFrom(0);
1262 leaderActorContext.setReplicatedLog(
1263 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1265 leaderActorContext.setCommitIndex(1);
1267 followerActorContext.getReplicatedLog().removeFrom(0);
1269 // follower too has the exact same log entries and has the same commit index
1270 followerActorContext.setReplicatedLog(
1271 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1273 followerActorContext.setCommitIndex(1);
1275 leader = new Leader(leaderActorContext);
1276 leaderActorContext.setCurrentBehavior(leader);
1278 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1280 assertEquals(-1, appendEntries.getLeaderCommit());
1281 assertEquals(0, appendEntries.getEntries().size());
1282 assertEquals(0, appendEntries.getPrevLogIndex());
1284 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1285 leaderActor, AppendEntriesReply.class);
1287 assertEquals(2, appendEntriesReply.getLogLastIndex());
1288 assertEquals(1, appendEntriesReply.getLogLastTerm());
1290 // follower returns its next index
1291 assertEquals(2, appendEntriesReply.getLogLastIndex());
1292 assertEquals(1, appendEntriesReply.getLogLastTerm());
1298 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1299 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1301 final MockRaftActorContext leaderActorContext = createActorContext();
1303 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1304 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1306 Follower follower = new Follower(followerActorContext);
1307 followerActor.underlyingActor().setBehavior(follower);
1308 followerActorContext.setCurrentBehavior(follower);
1310 Map<String, String> leaderPeerAddresses = new HashMap<>();
1311 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1313 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1315 leaderActorContext.getReplicatedLog().removeFrom(0);
1317 leaderActorContext.setReplicatedLog(
1318 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1320 leaderActorContext.setCommitIndex(1);
1322 followerActorContext.getReplicatedLog().removeFrom(0);
1324 followerActorContext.setReplicatedLog(
1325 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1327 // follower has the same log entries but its commit index > leaders commit index
1328 followerActorContext.setCommitIndex(2);
1330 leader = new Leader(leaderActorContext);
1332 // Initial heartbeat
1333 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1335 assertEquals(-1, appendEntries.getLeaderCommit());
1336 assertEquals(0, appendEntries.getEntries().size());
1337 assertEquals(0, appendEntries.getPrevLogIndex());
1339 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1340 leaderActor, AppendEntriesReply.class);
1342 assertEquals(2, appendEntriesReply.getLogLastIndex());
1343 assertEquals(1, appendEntriesReply.getLogLastTerm());
1345 leaderActor.underlyingActor().setBehavior(follower);
1346 leader.handleMessage(followerActor, appendEntriesReply);
1348 leaderActor.underlyingActor().clear();
1349 followerActor.underlyingActor().clear();
1351 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1352 TimeUnit.MILLISECONDS);
1354 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1356 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1358 assertEquals(2, appendEntries.getLeaderCommit());
1359 assertEquals(0, appendEntries.getEntries().size());
1360 assertEquals(2, appendEntries.getPrevLogIndex());
1362 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1364 assertEquals(2, appendEntriesReply.getLogLastIndex());
1365 assertEquals(1, appendEntriesReply.getLogLastTerm());
1367 assertEquals(2, followerActorContext.getCommitIndex());
1373 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1374 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1376 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1377 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1378 new FiniteDuration(1000, TimeUnit.SECONDS));
1380 leaderActorContext.setReplicatedLog(
1381 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1382 long leaderCommitIndex = 2;
1383 leaderActorContext.setCommitIndex(leaderCommitIndex);
1384 leaderActorContext.setLastApplied(leaderCommitIndex);
1386 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1387 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1389 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1391 followerActorContext.setReplicatedLog(
1392 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1393 followerActorContext.setCommitIndex(0);
1394 followerActorContext.setLastApplied(0);
1396 Follower follower = new Follower(followerActorContext);
1397 followerActor.underlyingActor().setBehavior(follower);
1399 leader = new Leader(leaderActorContext);
1401 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1402 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1403 AppendEntriesReply.class);
1405 MessageCollectorActor.clearMessages(followerActor);
1406 MessageCollectorActor.clearMessages(leaderActor);
1408 // Verify initial AppendEntries sent.
1409 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1410 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1411 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1413 leaderActor.underlyingActor().setBehavior(leader);
1415 leader.handleMessage(followerActor, appendEntriesReply);
1417 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1418 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1420 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1421 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1422 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1424 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1425 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1426 appendEntries.getEntries().get(0).getData());
1427 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1428 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1429 appendEntries.getEntries().get(1).getData());
1431 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1432 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1434 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1436 ApplyState applyState = applyStateList.get(0);
1437 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1438 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1439 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1440 applyState.getReplicatedLogEntry().getData());
1442 applyState = applyStateList.get(1);
1443 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1444 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1445 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1446 applyState.getReplicatedLogEntry().getData());
1448 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1449 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1453 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1454 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1456 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1457 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1458 new FiniteDuration(1000, TimeUnit.SECONDS));
1460 leaderActorContext.setReplicatedLog(
1461 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1462 long leaderCommitIndex = 1;
1463 leaderActorContext.setCommitIndex(leaderCommitIndex);
1464 leaderActorContext.setLastApplied(leaderCommitIndex);
1466 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1467 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1469 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1471 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1472 followerActorContext.setCommitIndex(-1);
1473 followerActorContext.setLastApplied(-1);
1475 Follower follower = new Follower(followerActorContext);
1476 followerActor.underlyingActor().setBehavior(follower);
1477 followerActorContext.setCurrentBehavior(follower);
1479 leader = new Leader(leaderActorContext);
1481 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1482 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1483 AppendEntriesReply.class);
1485 MessageCollectorActor.clearMessages(followerActor);
1486 MessageCollectorActor.clearMessages(leaderActor);
1488 // Verify initial AppendEntries sent with the leader's current commit index.
1489 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1490 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1491 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1493 leaderActor.underlyingActor().setBehavior(leader);
1494 leaderActorContext.setCurrentBehavior(leader);
1496 leader.handleMessage(followerActor, appendEntriesReply);
1498 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1499 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1501 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1502 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1503 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1505 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1506 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1507 appendEntries.getEntries().get(0).getData());
1508 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1509 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1510 appendEntries.getEntries().get(1).getData());
1512 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1513 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1515 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1517 ApplyState applyState = applyStateList.get(0);
1518 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1519 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1520 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1521 applyState.getReplicatedLogEntry().getData());
1523 applyState = applyStateList.get(1);
1524 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1525 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1526 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1527 applyState.getReplicatedLogEntry().getData());
1529 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1530 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1534 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1535 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1537 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1538 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1539 new FiniteDuration(1000, TimeUnit.SECONDS));
1541 leaderActorContext.setReplicatedLog(
1542 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1543 long leaderCommitIndex = 1;
1544 leaderActorContext.setCommitIndex(leaderCommitIndex);
1545 leaderActorContext.setLastApplied(leaderCommitIndex);
1547 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1548 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1550 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1552 followerActorContext.setReplicatedLog(
1553 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1554 followerActorContext.setCommitIndex(-1);
1555 followerActorContext.setLastApplied(-1);
1557 Follower follower = new Follower(followerActorContext);
1558 followerActor.underlyingActor().setBehavior(follower);
1559 followerActorContext.setCurrentBehavior(follower);
1561 leader = new Leader(leaderActorContext);
1563 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1564 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1565 AppendEntriesReply.class);
1567 MessageCollectorActor.clearMessages(followerActor);
1568 MessageCollectorActor.clearMessages(leaderActor);
1570 // Verify initial AppendEntries sent with the leader's current commit index.
1571 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1572 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1573 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1575 leaderActor.underlyingActor().setBehavior(leader);
1576 leaderActorContext.setCurrentBehavior(leader);
1578 leader.handleMessage(followerActor, appendEntriesReply);
1580 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1581 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1583 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1584 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1585 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1587 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1588 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1589 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1590 appendEntries.getEntries().get(0).getData());
1591 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1592 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1593 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1594 appendEntries.getEntries().get(1).getData());
1596 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1597 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1599 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1601 ApplyState applyState = applyStateList.get(0);
1602 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1603 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1604 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1605 applyState.getReplicatedLogEntry().getData());
1607 applyState = applyStateList.get(1);
1608 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1609 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1610 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1611 applyState.getReplicatedLogEntry().getData());
1613 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1614 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1615 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1619 public void testHandleAppendEntriesReplyWithNewerTerm() {
1620 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1622 MockRaftActorContext leaderActorContext = createActorContext();
1623 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1624 new FiniteDuration(10000, TimeUnit.SECONDS));
1626 leaderActorContext.setReplicatedLog(
1627 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1629 leader = new Leader(leaderActorContext);
1630 leaderActor.underlyingActor().setBehavior(leader);
1631 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1633 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1634 AppendEntriesReply.class);
1636 assertEquals(false, appendEntriesReply.isSuccess());
1637 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1639 MessageCollectorActor.clearMessages(leaderActor);
1643 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1644 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1646 MockRaftActorContext leaderActorContext = createActorContext();
1647 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1648 new FiniteDuration(10000, TimeUnit.SECONDS));
1650 leaderActorContext.setReplicatedLog(
1651 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1652 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1654 leader = new Leader(leaderActorContext);
1655 leaderActor.underlyingActor().setBehavior(leader);
1656 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1658 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1659 AppendEntriesReply.class);
1661 assertEquals(false, appendEntriesReply.isSuccess());
1662 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1664 MessageCollectorActor.clearMessages(leaderActor);
1668 public void testHandleAppendEntriesReplySuccess() throws Exception {
1669 logStart("testHandleAppendEntriesReplySuccess");
1671 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1673 leaderActorContext.setReplicatedLog(
1674 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1676 leaderActorContext.setCommitIndex(1);
1677 leaderActorContext.setLastApplied(1);
1678 leaderActorContext.getTermInformation().update(1, "leader");
1680 leader = new Leader(leaderActorContext);
1682 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1684 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1685 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1687 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1689 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1691 assertEquals(RaftState.Leader, raftActorBehavior.state());
1693 assertEquals(2, leaderActorContext.getCommitIndex());
1695 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1696 leaderActor, ApplyJournalEntries.class);
1698 assertEquals(2, leaderActorContext.getLastApplied());
1700 assertEquals(2, applyJournalEntries.getToIndex());
1702 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1705 assertEquals(1,applyStateList.size());
1707 ApplyState applyState = applyStateList.get(0);
1709 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1711 assertEquals(2, followerInfo.getMatchIndex());
1712 assertEquals(3, followerInfo.getNextIndex());
1713 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1714 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1718 public void testHandleAppendEntriesReplyUnknownFollower() {
1719 logStart("testHandleAppendEntriesReplyUnknownFollower");
1721 MockRaftActorContext leaderActorContext = createActorContext();
1723 leader = new Leader(leaderActorContext);
1725 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1727 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1729 assertEquals(RaftState.Leader, raftActorBehavior.state());
1733 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1734 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1736 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1737 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1738 new FiniteDuration(1000, TimeUnit.SECONDS));
1739 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1741 leaderActorContext.setReplicatedLog(
1742 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1743 long leaderCommitIndex = 3;
1744 leaderActorContext.setCommitIndex(leaderCommitIndex);
1745 leaderActorContext.setLastApplied(leaderCommitIndex);
1747 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1748 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1749 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1750 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1752 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1754 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1755 followerActorContext.setCommitIndex(-1);
1756 followerActorContext.setLastApplied(-1);
1758 Follower follower = new Follower(followerActorContext);
1759 followerActor.underlyingActor().setBehavior(follower);
1760 followerActorContext.setCurrentBehavior(follower);
1762 leader = new Leader(leaderActorContext);
1764 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1765 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1766 AppendEntriesReply.class);
1768 MessageCollectorActor.clearMessages(followerActor);
1769 MessageCollectorActor.clearMessages(leaderActor);
1771 // Verify initial AppendEntries sent with the leader's current commit index.
1772 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1773 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1774 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1776 leaderActor.underlyingActor().setBehavior(leader);
1777 leaderActorContext.setCurrentBehavior(leader);
1779 leader.handleMessage(followerActor, appendEntriesReply);
1781 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1782 AppendEntries.class, 2);
1783 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1785 appendEntries = appendEntriesList.get(0);
1786 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1787 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1788 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1790 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1791 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1792 appendEntries.getEntries().get(0).getData());
1793 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1794 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1795 appendEntries.getEntries().get(1).getData());
1797 appendEntries = appendEntriesList.get(1);
1798 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1799 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1800 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1802 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1803 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1804 appendEntries.getEntries().get(0).getData());
1805 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1806 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1807 appendEntries.getEntries().get(1).getData());
1809 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1810 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1812 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1814 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1815 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1819 public void testHandleRequestVoteReply() {
1820 logStart("testHandleRequestVoteReply");
1822 MockRaftActorContext leaderActorContext = createActorContext();
1824 leader = new Leader(leaderActorContext);
1826 // Should be a no-op.
1827 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1828 new RequestVoteReply(1, true));
1830 assertEquals(RaftState.Leader, raftActorBehavior.state());
1832 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1834 assertEquals(RaftState.Leader, raftActorBehavior.state());
1838 public void testIsolatedLeaderCheckNoFollowers() {
1839 logStart("testIsolatedLeaderCheckNoFollowers");
1841 MockRaftActorContext leaderActorContext = createActorContext();
1843 leader = new Leader(leaderActorContext);
1844 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1845 assertTrue(newBehavior instanceof Leader);
1849 public void testIsolatedLeaderCheckNoVotingFollowers() {
1850 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1852 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1853 Follower follower = new Follower(followerActorContext);
1854 followerActor.underlyingActor().setBehavior(follower);
1856 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1857 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1858 new FiniteDuration(1000, TimeUnit.SECONDS));
1859 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1861 leader = new Leader(leaderActorContext);
1862 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1863 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1864 assertTrue("Expected Leader", newBehavior instanceof Leader);
1867 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1868 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1869 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1871 MockRaftActorContext leaderActorContext = createActorContext();
1873 Map<String, String> peerAddresses = new HashMap<>();
1874 peerAddresses.put("follower-1", followerActor1.path().toString());
1875 peerAddresses.put("follower-2", followerActor2.path().toString());
1877 leaderActorContext.setPeerAddresses(peerAddresses);
1878 leaderActorContext.setRaftPolicy(raftPolicy);
1880 leader = new Leader(leaderActorContext);
1882 leader.markFollowerActive("follower-1");
1883 leader.markFollowerActive("follower-2");
1884 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1885 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1887 // kill 1 follower and verify if that got killed
1888 final JavaTestKit probe = new JavaTestKit(getSystem());
1889 probe.watch(followerActor1);
1890 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1891 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1892 assertEquals(termMsg1.getActor(), followerActor1);
1894 leader.markFollowerInActive("follower-1");
1895 leader.markFollowerActive("follower-2");
1896 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1897 assertTrue("Behavior not instance of Leader when majority of followers are active",
1898 newBehavior instanceof Leader);
1900 // kill 2nd follower and leader should change to Isolated leader
1901 followerActor2.tell(PoisonPill.getInstance(), null);
1902 probe.watch(followerActor2);
1903 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1904 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1905 assertEquals(termMsg2.getActor(), followerActor2);
1907 leader.markFollowerInActive("follower-2");
1908 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1912 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1913 logStart("testIsolatedLeaderCheckTwoFollowers");
1915 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1917 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1918 newBehavior instanceof IsolatedLeader);
1922 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1923 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1925 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1927 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1928 newBehavior instanceof Leader);
1932 public void testLaggingFollowerStarvation() throws Exception {
1933 logStart("testLaggingFollowerStarvation");
1935 String leaderActorId = actorFactory.generateActorId("leader");
1936 String follower1ActorId = actorFactory.generateActorId("follower");
1937 String follower2ActorId = actorFactory.generateActorId("follower");
1939 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1940 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1942 MockRaftActorContext leaderActorContext =
1943 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1945 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1946 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1947 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1949 leaderActorContext.setConfigParams(configParams);
1951 leaderActorContext.setReplicatedLog(
1952 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1954 Map<String, String> peerAddresses = new HashMap<>();
1955 peerAddresses.put(follower1ActorId,
1956 follower1Actor.path().toString());
1957 peerAddresses.put(follower2ActorId,
1958 follower2Actor.path().toString());
1960 leaderActorContext.setPeerAddresses(peerAddresses);
1961 leaderActorContext.getTermInformation().update(1, leaderActorId);
1963 leader = createBehavior(leaderActorContext);
1965 leaderActor.underlyingActor().setBehavior(leader);
1967 for (int i = 1; i < 6; i++) {
1968 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1969 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1970 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1971 assertTrue(newBehavior == leader);
1972 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1975 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1976 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1978 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1979 heartbeats.size() > 1);
1981 // Check if follower-2 got AppendEntries during this time and was not starved
1982 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1984 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1985 appendEntries.size() > 1);
1989 public void testReplicationConsensusWithNonVotingFollower() {
1990 logStart("testReplicationConsensusWithNonVotingFollower");
1992 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1993 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1994 new FiniteDuration(1000, TimeUnit.SECONDS));
1996 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1997 leaderActorContext.setCommitIndex(-1);
1998 leaderActorContext.setLastApplied(-1);
2000 String nonVotingFollowerId = "nonvoting-follower";
2001 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2002 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2004 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2005 VotingState.NON_VOTING);
2007 leader = new Leader(leaderActorContext);
2008 leaderActorContext.setCurrentBehavior(leader);
2010 // Ignore initial heartbeats
2011 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2014 MessageCollectorActor.clearMessages(followerActor);
2015 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2016 MessageCollectorActor.clearMessages(leaderActor);
2018 // Send a Replicate message and wait for AppendEntries.
2019 sendReplicate(leaderActorContext, 0);
2021 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2022 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2024 // Send reply only from the voting follower and verify consensus via ApplyState.
2025 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2027 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2029 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2031 MessageCollectorActor.clearMessages(followerActor);
2032 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2033 MessageCollectorActor.clearMessages(leaderActor);
2035 // Send another Replicate message
2036 sendReplicate(leaderActorContext, 1);
2038 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2039 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2040 AppendEntries.class);
2041 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2042 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2044 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2045 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2047 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2049 // Send reply from the voting follower and verify consensus.
2050 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2052 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2056 public void testTransferLeadershipWithFollowerInSync() {
2057 logStart("testTransferLeadershipWithFollowerInSync");
2059 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2060 leaderActorContext.setLastApplied(-1);
2061 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2062 new FiniteDuration(1000, TimeUnit.SECONDS));
2063 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2065 leader = new Leader(leaderActorContext);
2066 leaderActorContext.setCurrentBehavior(leader);
2068 // Initial heartbeat
2069 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2071 MessageCollectorActor.clearMessages(followerActor);
2073 sendReplicate(leaderActorContext, 0);
2074 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2076 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2077 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2078 MessageCollectorActor.clearMessages(followerActor);
2080 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2081 leader.transferLeadership(mockTransferCohort);
2083 verify(mockTransferCohort, never()).transferComplete();
2084 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2085 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2088 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2089 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2091 // Leader should force an election timeout
2092 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2094 verify(mockTransferCohort).transferComplete();
2098 public void testTransferLeadershipWithEmptyLog() {
2099 logStart("testTransferLeadershipWithEmptyLog");
2101 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2102 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2103 new FiniteDuration(1000, TimeUnit.SECONDS));
2104 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2106 leader = new Leader(leaderActorContext);
2107 leaderActorContext.setCurrentBehavior(leader);
2109 // Initial heartbeat
2110 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2112 MessageCollectorActor.clearMessages(followerActor);
2114 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2115 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2116 leader.transferLeadership(mockTransferCohort);
2118 verify(mockTransferCohort, never()).transferComplete();
2119 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2120 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2122 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2123 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2125 // Leader should force an election timeout
2126 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2128 verify(mockTransferCohort).transferComplete();
2132 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2133 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2135 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2136 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2137 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2139 leader = new Leader(leaderActorContext);
2140 leaderActorContext.setCurrentBehavior(leader);
2142 // Initial heartbeat
2143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144 MessageCollectorActor.clearMessages(followerActor);
2146 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2147 doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2148 leader.transferLeadership(mockTransferCohort);
2150 verify(mockTransferCohort, never()).transferComplete();
2152 // Sync up the follower.
2153 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2155 MessageCollectorActor.clearMessages(followerActor);
2157 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2158 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2159 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2160 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2161 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2163 // Leader should force an election timeout
2164 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2166 verify(mockTransferCohort).transferComplete();
2170 public void testTransferLeadershipWithFollowerSyncTimeout() {
2171 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2173 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2174 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2175 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2176 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2177 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2179 leader = new Leader(leaderActorContext);
2180 leaderActorContext.setCurrentBehavior(leader);
2182 // Initial heartbeat
2183 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2184 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2185 MessageCollectorActor.clearMessages(followerActor);
2187 sendReplicate(leaderActorContext, 0);
2188 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2190 MessageCollectorActor.clearMessages(followerActor);
2192 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2193 leader.transferLeadership(mockTransferCohort);
2195 verify(mockTransferCohort, never()).transferComplete();
2197 // Send heartbeats to time out the transfer.
2198 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2199 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2200 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2201 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2204 verify(mockTransferCohort).abortTransfer();
2205 verify(mockTransferCohort, never()).transferComplete();
2206 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2210 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2211 ActorRef actorRef, RaftRPC rpc) throws Exception {
2212 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2213 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2216 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2218 private final long electionTimeOutIntervalMillis;
2219 private final int snapshotChunkSize;
2221 MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2223 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2224 this.snapshotChunkSize = snapshotChunkSize;
2228 public FiniteDuration getElectionTimeOutInterval() {
2229 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2233 public int getSnapshotChunkSize() {
2234 return snapshotChunkSize;