2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.google.protobuf.ByteString;
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.List;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.After;
39 import org.junit.Test;
40 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
41 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
42 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
43 import org.opendaylight.controller.cluster.raft.RaftActorContext;
44 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
45 import org.opendaylight.controller.cluster.raft.RaftState;
46 import org.opendaylight.controller.cluster.raft.RaftVersions;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.VotingState;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
66 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
67 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
68 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
69 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.opendaylight.yangtools.concepts.Identifier;
72 import scala.concurrent.duration.FiniteDuration;
74 public class LeaderTest extends AbstractLeaderTest<Leader> {
76 static final String FOLLOWER_ID = "follower";
77 public static final String LEADER_ID = "leader";
79 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
80 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
82 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
83 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
85 private Leader leader;
86 private final short payloadVersion = 5;
90 public void tearDown() throws Exception {
99 public void testHandleMessageForUnknownMessage() throws Exception {
100 logStart("testHandleMessageForUnknownMessage");
102 leader = new Leader(createActorContext());
104 // handle message should null when it receives an unknown message
105 assertNull(leader.handleMessage(followerActor, "foo"));
109 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
110 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
112 MockRaftActorContext actorContext = createActorContextWithFollower();
113 actorContext.setCommitIndex(-1);
114 actorContext.setPayloadVersion(payloadVersion);
117 actorContext.getTermInformation().update(term, "");
119 leader = new Leader(actorContext);
120 actorContext.setCurrentBehavior(leader);
122 // Leader should send an immediate heartbeat with no entries as follower is inactive.
123 final long lastIndex = actorContext.getReplicatedLog().lastIndex();
124 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
125 assertEquals("getTerm", term, appendEntries.getTerm());
126 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
127 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
128 assertEquals("Entries size", 0, appendEntries.getEntries().size());
129 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
131 // The follower would normally reply - simulate that explicitly here.
132 leader.handleMessage(followerActor, new AppendEntriesReply(
133 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
134 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
136 followerActor.underlyingActor().clear();
138 // Sleep for the heartbeat interval so AppendEntries is sent.
139 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
140 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
142 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
144 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
146 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
147 assertEquals("Entries size", 1, appendEntries.getEntries().size());
148 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
149 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
150 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
155 return sendReplicate(actorContext, 1, index);
158 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
159 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
160 SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
161 actorContext.getReplicatedLog().append(newEntry);
162 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
166 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
167 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
169 MockRaftActorContext actorContext = createActorContextWithFollower();
172 actorContext.getTermInformation().update(term, "");
174 leader = new Leader(actorContext);
176 // Leader will send an immediate heartbeat - ignore it.
177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
179 // The follower would normally reply - simulate that explicitly here.
180 long lastIndex = actorContext.getReplicatedLog().lastIndex();
181 leader.handleMessage(followerActor, new AppendEntriesReply(
182 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
183 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
185 followerActor.underlyingActor().clear();
187 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
189 // State should not change
190 assertTrue(raftBehavior instanceof Leader);
192 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
193 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
194 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
195 assertEquals("Entries size", 1, appendEntries.getEntries().size());
196 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
197 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
198 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
199 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
203 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
204 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
206 MockRaftActorContext actorContext = createActorContextWithFollower();
207 actorContext.setCommitIndex(-1);
208 actorContext.setLastApplied(-1);
210 // The raft context is initialized with a couple log entries. However the commitIndex
211 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
212 // committed and applied. Now it regains leadership with a higher term (2).
213 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
214 long newTerm = prevTerm + 1;
215 actorContext.getTermInformation().update(newTerm, "");
217 leader = new Leader(actorContext);
218 actorContext.setCurrentBehavior(leader);
220 // Leader will send an immediate heartbeat - ignore it.
221 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
223 // The follower replies with the leader's current last index and term, simulating that it is
224 // up to date with the leader.
225 long lastIndex = actorContext.getReplicatedLog().lastIndex();
226 leader.handleMessage(followerActor, new AppendEntriesReply(
227 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
229 // The commit index should not get updated even though consensus was reached. This is b/c the
230 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
231 // from previous terms by counting replicas".
232 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
234 followerActor.underlyingActor().clear();
236 // Now replicate a new entry with the new term 2.
237 long newIndex = lastIndex + 1;
238 sendReplicate(actorContext, newTerm, newIndex);
240 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
241 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
242 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
243 assertEquals("Entries size", 1, appendEntries.getEntries().size());
244 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
245 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
246 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
248 // The follower replies with success. The leader should now update the commit index to the new index
249 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
250 // prior entries are committed indirectly".
251 leader.handleMessage(followerActor, new AppendEntriesReply(
252 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
254 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
258 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
259 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
261 MockRaftActorContext actorContext = createActorContextWithFollower();
262 actorContext.setRaftPolicy(createRaftPolicy(true, true));
265 actorContext.getTermInformation().update(term, "");
267 leader = new Leader(actorContext);
269 // Leader will send an immediate heartbeat - ignore it.
270 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
272 // The follower would normally reply - simulate that explicitly here.
273 long lastIndex = actorContext.getReplicatedLog().lastIndex();
274 leader.handleMessage(followerActor, new AppendEntriesReply(
275 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
276 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
278 followerActor.underlyingActor().clear();
280 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
282 // State should not change
283 assertTrue(raftBehavior instanceof Leader);
285 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
286 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
287 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
288 assertEquals("Entries size", 1, appendEntries.getEntries().size());
289 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
290 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
291 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
292 assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
296 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
297 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
299 MockRaftActorContext actorContext = createActorContextWithFollower();
300 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
302 public FiniteDuration getHeartBeatInterval() {
303 return FiniteDuration.apply(5, TimeUnit.SECONDS);
308 actorContext.getTermInformation().update(term, "");
310 leader = new Leader(actorContext);
312 // Leader will send an immediate heartbeat - ignore it.
313 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
315 // The follower would normally reply - simulate that explicitly here.
316 long lastIndex = actorContext.getReplicatedLog().lastIndex();
317 leader.handleMessage(followerActor, new AppendEntriesReply(
318 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
319 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
321 followerActor.underlyingActor().clear();
323 for (int i = 0; i < 5; i++) {
324 sendReplicate(actorContext, lastIndex + i + 1);
327 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
328 // We expect only 1 message to be sent because of two reasons,
329 // - an append entries reply was not received
330 // - the heartbeat interval has not expired
331 // In this scenario if multiple messages are sent they would likely be duplicates
332 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
336 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
337 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
339 MockRaftActorContext actorContext = createActorContextWithFollower();
340 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
342 public FiniteDuration getHeartBeatInterval() {
343 return FiniteDuration.apply(5, TimeUnit.SECONDS);
348 actorContext.getTermInformation().update(term, "");
350 leader = new Leader(actorContext);
352 // Leader will send an immediate heartbeat - ignore it.
353 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
355 // The follower would normally reply - simulate that explicitly here.
356 long lastIndex = actorContext.getReplicatedLog().lastIndex();
357 leader.handleMessage(followerActor, new AppendEntriesReply(
358 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
359 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
361 followerActor.underlyingActor().clear();
363 for (int i = 0; i < 3; i++) {
364 sendReplicate(actorContext, lastIndex + i + 1);
365 leader.handleMessage(followerActor, new AppendEntriesReply(
366 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
370 for (int i = 3; i < 5; i++) {
371 sendReplicate(actorContext, lastIndex + i + 1);
374 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
375 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
376 // get sent to the follower - but not the 5th
377 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
379 for (int i = 0; i < 4; i++) {
380 long expected = allMessages.get(i).getEntries().get(0).getIndex();
381 assertEquals(expected, i + 2);
386 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
387 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
389 MockRaftActorContext actorContext = createActorContextWithFollower();
390 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
392 public FiniteDuration getHeartBeatInterval() {
393 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
398 actorContext.getTermInformation().update(term, "");
400 leader = new Leader(actorContext);
402 // Leader will send an immediate heartbeat - ignore it.
403 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
405 // The follower would normally reply - simulate that explicitly here.
406 long lastIndex = actorContext.getReplicatedLog().lastIndex();
407 leader.handleMessage(followerActor, new AppendEntriesReply(
408 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
409 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
411 followerActor.underlyingActor().clear();
413 sendReplicate(actorContext, lastIndex + 1);
415 // Wait slightly longer than heartbeat duration
416 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
418 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
420 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
421 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
423 assertEquals(1, allMessages.get(0).getEntries().size());
424 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
425 assertEquals(1, allMessages.get(1).getEntries().size());
426 assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
431 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
432 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
434 MockRaftActorContext actorContext = createActorContextWithFollower();
435 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
437 public FiniteDuration getHeartBeatInterval() {
438 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
443 actorContext.getTermInformation().update(term, "");
445 leader = new Leader(actorContext);
447 // Leader will send an immediate heartbeat - ignore it.
448 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
450 // The follower would normally reply - simulate that explicitly here.
451 long lastIndex = actorContext.getReplicatedLog().lastIndex();
452 leader.handleMessage(followerActor, new AppendEntriesReply(
453 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
454 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
456 followerActor.underlyingActor().clear();
458 for (int i = 0; i < 3; i++) {
459 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
460 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
463 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
464 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
468 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
469 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
471 MockRaftActorContext actorContext = createActorContextWithFollower();
472 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
474 public FiniteDuration getHeartBeatInterval() {
475 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
480 actorContext.getTermInformation().update(term, "");
482 leader = new Leader(actorContext);
484 // Leader will send an immediate heartbeat - ignore it.
485 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
487 // The follower would normally reply - simulate that explicitly here.
488 long lastIndex = actorContext.getReplicatedLog().lastIndex();
489 leader.handleMessage(followerActor, new AppendEntriesReply(
490 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
491 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
493 followerActor.underlyingActor().clear();
495 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
496 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
497 sendReplicate(actorContext, lastIndex + 1);
499 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
500 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
502 assertEquals(0, allMessages.get(0).getEntries().size());
503 assertEquals(1, allMessages.get(1).getEntries().size());
508 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
509 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
511 MockRaftActorContext actorContext = createActorContext();
513 leader = new Leader(actorContext);
515 actorContext.setLastApplied(0);
517 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
518 long term = actorContext.getTermInformation().getCurrentTerm();
519 ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
520 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
522 actorContext.getReplicatedLog().append(newEntry);
524 final Identifier id = new MockIdentifier("state-id");
525 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
526 new Replicate(leaderActor, id, newEntry, true));
528 // State should not change
529 assertTrue(raftBehavior instanceof Leader);
531 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
533 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
534 // one since lastApplied state is 0.
535 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
536 leaderActor, ApplyState.class);
537 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
539 for (int i = 0; i <= newLogIndex - 1; i++) {
540 ApplyState applyState = applyStateList.get(i);
541 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
542 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
545 ApplyState last = applyStateList.get((int) newLogIndex - 1);
546 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
547 assertEquals("getIdentifier", id, last.getIdentifier());
551 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
552 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
554 final MockRaftActorContext actorContext = createActorContextWithFollower();
556 Map<String, String> leadersSnapshot = new HashMap<>();
557 leadersSnapshot.put("1", "A");
558 leadersSnapshot.put("2", "B");
559 leadersSnapshot.put("3", "C");
562 actorContext.getReplicatedLog().removeFrom(0);
564 final int commitIndex = 3;
565 final int snapshotIndex = 2;
566 final int snapshotTerm = 1;
568 // set the snapshot variables in replicatedlog
569 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
570 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
571 actorContext.setCommitIndex(commitIndex);
572 //set follower timeout to 2 mins, helps during debugging
573 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
575 leader = new Leader(actorContext);
577 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
578 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
580 //update follower timestamp
581 leader.markFollowerActive(FOLLOWER_ID);
583 ByteString bs = toByteString(leadersSnapshot);
584 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
585 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
586 -1, null, null), ByteSource.wrap(bs.toByteArray())));
587 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
588 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
589 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
590 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
592 //send first chunk and no InstallSnapshotReply received yet
594 fts.incrementChunkIndex();
596 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
597 TimeUnit.MILLISECONDS);
599 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
601 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
603 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
605 //InstallSnapshotReply received
606 fts.markSendStatus(true);
608 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
610 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
612 assertEquals(commitIndex, is.getLastIncludedIndex());
616 public void testSendAppendEntriesSnapshotScenario() throws Exception {
617 logStart("testSendAppendEntriesSnapshotScenario");
619 final MockRaftActorContext actorContext = createActorContextWithFollower();
621 Map<String, String> leadersSnapshot = new HashMap<>();
622 leadersSnapshot.put("1", "A");
623 leadersSnapshot.put("2", "B");
624 leadersSnapshot.put("3", "C");
627 actorContext.getReplicatedLog().removeFrom(0);
629 final int followersLastIndex = 2;
630 final int snapshotIndex = 3;
631 final int newEntryIndex = 4;
632 final int snapshotTerm = 1;
633 final int currentTerm = 2;
635 // set the snapshot variables in replicatedlog
636 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638 actorContext.setCommitIndex(followersLastIndex);
640 leader = new Leader(actorContext);
642 // Leader will send an immediate heartbeat - ignore it.
643 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
646 SimpleReplicatedLogEntry entry =
647 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
648 new MockRaftActorContext.MockPayload("D"));
650 actorContext.getReplicatedLog().append(entry);
652 //update follower timestamp
653 leader.markFollowerActive(FOLLOWER_ID);
655 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656 RaftActorBehavior raftBehavior = leader.handleMessage(
657 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
659 assertTrue(raftBehavior instanceof Leader);
661 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
665 public void testInitiateInstallSnapshot() throws Exception {
666 logStart("testInitiateInstallSnapshot");
668 MockRaftActorContext actorContext = createActorContextWithFollower();
671 actorContext.getReplicatedLog().removeFrom(0);
673 final int followersLastIndex = 2;
674 final int snapshotIndex = 3;
675 final int newEntryIndex = 4;
676 final int snapshotTerm = 1;
677 final int currentTerm = 2;
679 // set the snapshot variables in replicatedlog
680 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682 actorContext.setLastApplied(3);
683 actorContext.setCommitIndex(followersLastIndex);
685 leader = new Leader(actorContext);
687 // Leader will send an immediate heartbeat - ignore it.
688 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
690 // set the snapshot as absent and check if capture-snapshot is invoked.
691 leader.setSnapshot(null);
694 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
695 new MockRaftActorContext.MockPayload("D"));
697 actorContext.getReplicatedLog().append(entry);
699 //update follower timestamp
700 leader.markFollowerActive(FOLLOWER_ID);
702 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
704 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
706 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
708 assertEquals(3, cs.getLastAppliedIndex());
709 assertEquals(1, cs.getLastAppliedTerm());
710 assertEquals(4, cs.getLastIndex());
711 assertEquals(2, cs.getLastTerm());
713 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
716 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
720 public void testInitiateForceInstallSnapshot() throws Exception {
721 logStart("testInitiateForceInstallSnapshot");
723 MockRaftActorContext actorContext = createActorContextWithFollower();
725 final int followersLastIndex = 2;
726 final int snapshotIndex = -1;
727 final int newEntryIndex = 4;
728 final int snapshotTerm = -1;
729 final int currentTerm = 2;
731 // set the snapshot variables in replicatedlog
732 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734 actorContext.setLastApplied(3);
735 actorContext.setCommitIndex(followersLastIndex);
737 actorContext.getReplicatedLog().removeFrom(0);
739 leader = new Leader(actorContext);
740 actorContext.setCurrentBehavior(leader);
742 // Leader will send an immediate heartbeat - ignore it.
743 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
745 // set the snapshot as absent and check if capture-snapshot is invoked.
746 leader.setSnapshot(null);
748 for (int i = 0; i < 4; i++) {
749 actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
750 new MockRaftActorContext.MockPayload("X" + i)));
754 SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
755 new MockRaftActorContext.MockPayload("D"));
757 actorContext.getReplicatedLog().append(entry);
759 //update follower timestamp
760 leader.markFollowerActive(FOLLOWER_ID);
762 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763 // installed with a SendInstallSnapshot
764 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
766 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
768 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
770 assertEquals(3, cs.getLastAppliedIndex());
771 assertEquals(1, cs.getLastAppliedTerm());
772 assertEquals(4, cs.getLastIndex());
773 assertEquals(2, cs.getLastTerm());
775 // if an initiate is started again when first is in progress, it should not initiate Capture
776 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
778 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
783 public void testInstallSnapshot() throws Exception {
784 logStart("testInstallSnapshot");
786 final MockRaftActorContext actorContext = createActorContextWithFollower();
788 Map<String, String> leadersSnapshot = new HashMap<>();
789 leadersSnapshot.put("1", "A");
790 leadersSnapshot.put("2", "B");
791 leadersSnapshot.put("3", "C");
794 actorContext.getReplicatedLog().removeFrom(0);
796 final int lastAppliedIndex = 3;
797 final int snapshotIndex = 2;
798 final int snapshotTerm = 1;
799 final int currentTerm = 2;
801 // set the snapshot variables in replicatedlog
802 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805 actorContext.setCommitIndex(lastAppliedIndex);
806 actorContext.setLastApplied(lastAppliedIndex);
808 leader = new Leader(actorContext);
810 // Initial heartbeat.
811 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
813 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
816 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
817 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
818 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
820 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
821 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
823 assertTrue(raftBehavior instanceof Leader);
825 // check if installsnapshot gets called with the correct values.
827 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
828 InstallSnapshot.class);
830 assertNotNull(installSnapshot.getData());
831 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
832 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
834 assertEquals(currentTerm, installSnapshot.getTerm());
838 public void testForceInstallSnapshot() throws Exception {
839 logStart("testForceInstallSnapshot");
841 final MockRaftActorContext actorContext = createActorContextWithFollower();
843 Map<String, String> leadersSnapshot = new HashMap<>();
844 leadersSnapshot.put("1", "A");
845 leadersSnapshot.put("2", "B");
846 leadersSnapshot.put("3", "C");
848 final int lastAppliedIndex = 3;
849 final int snapshotIndex = -1;
850 final int snapshotTerm = -1;
851 final int currentTerm = 2;
853 // set the snapshot variables in replicatedlog
854 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
857 actorContext.setCommitIndex(lastAppliedIndex);
858 actorContext.setLastApplied(lastAppliedIndex);
860 leader = new Leader(actorContext);
862 // Initial heartbeat.
863 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
865 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
866 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
868 byte[] bytes = toByteString(leadersSnapshot).toByteArray();
869 Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
870 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
872 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
873 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
875 assertTrue(raftBehavior instanceof Leader);
877 // check if installsnapshot gets called with the correct values.
879 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
880 InstallSnapshot.class);
882 assertNotNull(installSnapshot.getData());
883 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
886 assertEquals(currentTerm, installSnapshot.getTerm());
890 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
891 logStart("testHandleInstallSnapshotReplyLastChunk");
893 MockRaftActorContext actorContext = createActorContextWithFollower();
895 final int commitIndex = 3;
896 final int snapshotIndex = 2;
897 final int snapshotTerm = 1;
898 final int currentTerm = 2;
900 actorContext.setCommitIndex(commitIndex);
902 leader = new Leader(actorContext);
903 actorContext.setCurrentBehavior(leader);
905 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
906 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
908 // Ignore initial heartbeat.
909 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
911 Map<String, String> leadersSnapshot = new HashMap<>();
912 leadersSnapshot.put("1", "A");
913 leadersSnapshot.put("2", "B");
914 leadersSnapshot.put("3", "C");
916 // set the snapshot variables in replicatedlog
918 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
922 ByteString bs = toByteString(leadersSnapshot);
923 leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
924 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
925 -1, null, null), ByteSource.wrap(bs.toByteArray())));
926 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
927 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
928 fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
929 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
930 while (!fts.isLastChunk(fts.getChunkIndex())) {
932 fts.incrementChunkIndex();
936 actorContext.getReplicatedLog().removeFrom(0);
938 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
939 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
941 assertTrue(raftBehavior instanceof Leader);
943 assertEquals(1, leader.followerLogSize());
944 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
946 assertNull(fli.getInstallSnapshotState());
947 assertEquals(commitIndex, fli.getMatchIndex());
948 assertEquals(commitIndex + 1, fli.getNextIndex());
949 assertFalse(leader.hasSnapshot());
953 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
954 logStart("testSendSnapshotfromInstallSnapshotReply");
956 MockRaftActorContext actorContext = createActorContextWithFollower();
958 final int commitIndex = 3;
959 final int snapshotIndex = 2;
960 final int snapshotTerm = 1;
961 final int currentTerm = 2;
963 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
965 public int getSnapshotChunkSize() {
969 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
970 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
972 actorContext.setConfigParams(configParams);
973 actorContext.setCommitIndex(commitIndex);
975 leader = new Leader(actorContext);
976 actorContext.setCurrentBehavior(leader);
978 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
979 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
981 Map<String, String> leadersSnapshot = new HashMap<>();
982 leadersSnapshot.put("1", "A");
983 leadersSnapshot.put("2", "B");
984 leadersSnapshot.put("3", "C");
986 // set the snapshot variables in replicatedlog
987 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
988 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
989 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
991 ByteString bs = toByteString(leadersSnapshot);
992 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
993 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
996 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
998 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
999 InstallSnapshot.class);
1001 assertEquals(1, installSnapshot.getChunkIndex());
1002 assertEquals(3, installSnapshot.getTotalChunks());
1004 followerActor.underlyingActor().clear();
1005 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1008 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1010 assertEquals(2, installSnapshot.getChunkIndex());
1011 assertEquals(3, installSnapshot.getTotalChunks());
1013 followerActor.underlyingActor().clear();
1014 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1015 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1017 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1019 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1020 followerActor.underlyingActor().clear();
1021 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1022 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1024 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1026 assertNull(installSnapshot);
1031 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1032 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1034 MockRaftActorContext actorContext = createActorContextWithFollower();
1036 final int commitIndex = 3;
1037 final int snapshotIndex = 2;
1038 final int snapshotTerm = 1;
1039 final int currentTerm = 2;
1041 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1043 public int getSnapshotChunkSize() {
1048 actorContext.setCommitIndex(commitIndex);
1050 leader = new Leader(actorContext);
1052 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1053 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1055 Map<String, String> leadersSnapshot = new HashMap<>();
1056 leadersSnapshot.put("1", "A");
1057 leadersSnapshot.put("2", "B");
1058 leadersSnapshot.put("3", "C");
1060 // set the snapshot variables in replicatedlog
1061 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1062 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1063 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1065 ByteString bs = toByteString(leadersSnapshot);
1066 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1067 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1070 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1071 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1073 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1074 InstallSnapshot.class);
1076 assertEquals(1, installSnapshot.getChunkIndex());
1077 assertEquals(3, installSnapshot.getTotalChunks());
1079 followerActor.underlyingActor().clear();
1081 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1082 FOLLOWER_ID, -1, false));
1084 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1085 TimeUnit.MILLISECONDS);
1087 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1089 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1091 assertEquals(1, installSnapshot.getChunkIndex());
1092 assertEquals(3, installSnapshot.getTotalChunks());
1096 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1097 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1099 MockRaftActorContext actorContext = createActorContextWithFollower();
1101 final int commitIndex = 3;
1102 final int snapshotIndex = 2;
1103 final int snapshotTerm = 1;
1104 final int currentTerm = 2;
1106 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1108 public int getSnapshotChunkSize() {
1113 actorContext.setCommitIndex(commitIndex);
1115 leader = new Leader(actorContext);
1117 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1118 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1120 Map<String, String> leadersSnapshot = new HashMap<>();
1121 leadersSnapshot.put("1", "A");
1122 leadersSnapshot.put("2", "B");
1123 leadersSnapshot.put("3", "C");
1125 // set the snapshot variables in replicatedlog
1126 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1127 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1128 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1130 ByteString bs = toByteString(leadersSnapshot);
1131 Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1132 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1135 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1137 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1138 InstallSnapshot.class);
1140 assertEquals(1, installSnapshot.getChunkIndex());
1141 assertEquals(3, installSnapshot.getTotalChunks());
1142 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1143 installSnapshot.getLastChunkHashCode().get().intValue());
1145 final int hashCode = Arrays.hashCode(installSnapshot.getData());
1147 followerActor.underlyingActor().clear();
1149 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1150 FOLLOWER_ID, 1, true));
1152 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1154 assertEquals(2, installSnapshot.getChunkIndex());
1155 assertEquals(3, installSnapshot.getTotalChunks());
1156 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1160 public void testLeaderInstallSnapshotState() throws IOException {
1161 logStart("testLeaderInstallSnapshotState");
1163 Map<String, String> leadersSnapshot = new HashMap<>();
1164 leadersSnapshot.put("1", "A");
1165 leadersSnapshot.put("2", "B");
1166 leadersSnapshot.put("3", "C");
1168 ByteString bs = toByteString(leadersSnapshot);
1169 byte[] barray = bs.toByteArray();
1171 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1172 fts.setSnapshotBytes(ByteSource.wrap(barray));
1174 assertEquals(bs.size(), barray.length);
1177 for (int i = 0; i < barray.length; i = i + 50) {
1178 int length = i + 50;
1181 if (i + 50 > barray.length) {
1182 length = barray.length;
1185 byte[] chunk = fts.getNextChunk();
1186 assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1187 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1189 fts.markSendStatus(true);
1190 if (!fts.isLastChunk(chunkIndex)) {
1191 fts.incrementChunkIndex();
1195 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1200 protected Leader createBehavior(final RaftActorContext actorContext) {
1201 return new Leader(actorContext);
1205 protected MockRaftActorContext createActorContext() {
1206 return createActorContext(leaderActor);
1210 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1211 return createActorContext(LEADER_ID, actorRef);
1214 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1215 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1216 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1217 configParams.setElectionTimeoutFactor(100000);
1218 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1219 context.setConfigParams(configParams);
1220 context.setPayloadVersion(payloadVersion);
1224 private MockRaftActorContext createActorContextWithFollower() {
1225 MockRaftActorContext actorContext = createActorContext();
1226 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1227 followerActor.path().toString()).build());
1228 return actorContext;
1231 private MockRaftActorContext createFollowerActorContextWithLeader() {
1232 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1233 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1234 followerConfig.setElectionTimeoutFactor(10000);
1235 followerActorContext.setConfigParams(followerConfig);
1236 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1237 return followerActorContext;
1241 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1242 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1244 final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1246 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1248 Follower follower = new Follower(followerActorContext);
1249 followerActor.underlyingActor().setBehavior(follower);
1250 followerActorContext.setCurrentBehavior(follower);
1252 Map<String, String> peerAddresses = new HashMap<>();
1253 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1255 leaderActorContext.setPeerAddresses(peerAddresses);
1257 leaderActorContext.getReplicatedLog().removeFrom(0);
1260 leaderActorContext.setReplicatedLog(
1261 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1263 leaderActorContext.setCommitIndex(1);
1265 followerActorContext.getReplicatedLog().removeFrom(0);
1267 // follower too has the exact same log entries and has the same commit index
1268 followerActorContext.setReplicatedLog(
1269 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1271 followerActorContext.setCommitIndex(1);
1273 leader = new Leader(leaderActorContext);
1274 leaderActorContext.setCurrentBehavior(leader);
1276 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1278 assertEquals(-1, appendEntries.getLeaderCommit());
1279 assertEquals(0, appendEntries.getEntries().size());
1280 assertEquals(0, appendEntries.getPrevLogIndex());
1282 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1283 leaderActor, AppendEntriesReply.class);
1285 assertEquals(2, appendEntriesReply.getLogLastIndex());
1286 assertEquals(1, appendEntriesReply.getLogLastTerm());
1288 // follower returns its next index
1289 assertEquals(2, appendEntriesReply.getLogLastIndex());
1290 assertEquals(1, appendEntriesReply.getLogLastTerm());
1296 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1297 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1299 final MockRaftActorContext leaderActorContext = createActorContext();
1301 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1302 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1304 Follower follower = new Follower(followerActorContext);
1305 followerActor.underlyingActor().setBehavior(follower);
1306 followerActorContext.setCurrentBehavior(follower);
1308 Map<String, String> leaderPeerAddresses = new HashMap<>();
1309 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1311 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1313 leaderActorContext.getReplicatedLog().removeFrom(0);
1315 leaderActorContext.setReplicatedLog(
1316 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1318 leaderActorContext.setCommitIndex(1);
1320 followerActorContext.getReplicatedLog().removeFrom(0);
1322 followerActorContext.setReplicatedLog(
1323 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1325 // follower has the same log entries but its commit index > leaders commit index
1326 followerActorContext.setCommitIndex(2);
1328 leader = new Leader(leaderActorContext);
1330 // Initial heartbeat
1331 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1333 assertEquals(-1, appendEntries.getLeaderCommit());
1334 assertEquals(0, appendEntries.getEntries().size());
1335 assertEquals(0, appendEntries.getPrevLogIndex());
1337 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1338 leaderActor, AppendEntriesReply.class);
1340 assertEquals(2, appendEntriesReply.getLogLastIndex());
1341 assertEquals(1, appendEntriesReply.getLogLastTerm());
1343 leaderActor.underlyingActor().setBehavior(follower);
1344 leader.handleMessage(followerActor, appendEntriesReply);
1346 leaderActor.underlyingActor().clear();
1347 followerActor.underlyingActor().clear();
1349 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1350 TimeUnit.MILLISECONDS);
1352 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1354 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1356 assertEquals(2, appendEntries.getLeaderCommit());
1357 assertEquals(0, appendEntries.getEntries().size());
1358 assertEquals(2, appendEntries.getPrevLogIndex());
1360 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1362 assertEquals(2, appendEntriesReply.getLogLastIndex());
1363 assertEquals(1, appendEntriesReply.getLogLastTerm());
1365 assertEquals(2, followerActorContext.getCommitIndex());
1371 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1372 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1374 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1375 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1376 new FiniteDuration(1000, TimeUnit.SECONDS));
1378 leaderActorContext.setReplicatedLog(
1379 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1380 long leaderCommitIndex = 2;
1381 leaderActorContext.setCommitIndex(leaderCommitIndex);
1382 leaderActorContext.setLastApplied(leaderCommitIndex);
1384 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1385 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1387 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1389 followerActorContext.setReplicatedLog(
1390 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1391 followerActorContext.setCommitIndex(0);
1392 followerActorContext.setLastApplied(0);
1394 Follower follower = new Follower(followerActorContext);
1395 followerActor.underlyingActor().setBehavior(follower);
1397 leader = new Leader(leaderActorContext);
1399 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1400 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1401 AppendEntriesReply.class);
1403 MessageCollectorActor.clearMessages(followerActor);
1404 MessageCollectorActor.clearMessages(leaderActor);
1406 // Verify initial AppendEntries sent.
1407 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1408 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1409 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1411 leaderActor.underlyingActor().setBehavior(leader);
1413 leader.handleMessage(followerActor, appendEntriesReply);
1415 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1416 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1418 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1420 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1422 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1423 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1424 appendEntries.getEntries().get(0).getData());
1425 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1426 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1427 appendEntries.getEntries().get(1).getData());
1429 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1430 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1432 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1434 ApplyState applyState = applyStateList.get(0);
1435 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1436 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1438 applyState.getReplicatedLogEntry().getData());
1440 applyState = applyStateList.get(1);
1441 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1442 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1444 applyState.getReplicatedLogEntry().getData());
1446 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1447 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1451 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1452 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1454 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1455 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1456 new FiniteDuration(1000, TimeUnit.SECONDS));
1458 leaderActorContext.setReplicatedLog(
1459 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1460 long leaderCommitIndex = 1;
1461 leaderActorContext.setCommitIndex(leaderCommitIndex);
1462 leaderActorContext.setLastApplied(leaderCommitIndex);
1464 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1465 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1467 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1469 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1470 followerActorContext.setCommitIndex(-1);
1471 followerActorContext.setLastApplied(-1);
1473 Follower follower = new Follower(followerActorContext);
1474 followerActor.underlyingActor().setBehavior(follower);
1475 followerActorContext.setCurrentBehavior(follower);
1477 leader = new Leader(leaderActorContext);
1479 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1480 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1481 AppendEntriesReply.class);
1483 MessageCollectorActor.clearMessages(followerActor);
1484 MessageCollectorActor.clearMessages(leaderActor);
1486 // Verify initial AppendEntries sent with the leader's current commit index.
1487 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1488 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1489 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1491 leaderActor.underlyingActor().setBehavior(leader);
1492 leaderActorContext.setCurrentBehavior(leader);
1494 leader.handleMessage(followerActor, appendEntriesReply);
1496 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1497 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1499 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1500 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1501 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1503 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1504 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1505 appendEntries.getEntries().get(0).getData());
1506 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1507 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1508 appendEntries.getEntries().get(1).getData());
1510 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1511 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1513 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1515 ApplyState applyState = applyStateList.get(0);
1516 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1517 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1518 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1519 applyState.getReplicatedLogEntry().getData());
1521 applyState = applyStateList.get(1);
1522 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1523 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1524 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1525 applyState.getReplicatedLogEntry().getData());
1527 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1528 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1532 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1533 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1535 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1536 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1537 new FiniteDuration(1000, TimeUnit.SECONDS));
1539 leaderActorContext.setReplicatedLog(
1540 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1541 long leaderCommitIndex = 1;
1542 leaderActorContext.setCommitIndex(leaderCommitIndex);
1543 leaderActorContext.setLastApplied(leaderCommitIndex);
1545 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1546 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1548 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1550 followerActorContext.setReplicatedLog(
1551 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1552 followerActorContext.setCommitIndex(-1);
1553 followerActorContext.setLastApplied(-1);
1555 Follower follower = new Follower(followerActorContext);
1556 followerActor.underlyingActor().setBehavior(follower);
1557 followerActorContext.setCurrentBehavior(follower);
1559 leader = new Leader(leaderActorContext);
1561 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1562 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1563 AppendEntriesReply.class);
1565 MessageCollectorActor.clearMessages(followerActor);
1566 MessageCollectorActor.clearMessages(leaderActor);
1568 // Verify initial AppendEntries sent with the leader's current commit index.
1569 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1570 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1571 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1573 leaderActor.underlyingActor().setBehavior(leader);
1574 leaderActorContext.setCurrentBehavior(leader);
1576 leader.handleMessage(followerActor, appendEntriesReply);
1578 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1579 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1581 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1582 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1583 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1585 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1586 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1587 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1588 appendEntries.getEntries().get(0).getData());
1589 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1590 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1591 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1592 appendEntries.getEntries().get(1).getData());
1594 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1595 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1597 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1599 ApplyState applyState = applyStateList.get(0);
1600 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1601 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1602 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1603 applyState.getReplicatedLogEntry().getData());
1605 applyState = applyStateList.get(1);
1606 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1607 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1608 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1609 applyState.getReplicatedLogEntry().getData());
1611 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1612 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1613 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1617 public void testHandleAppendEntriesReplyWithNewerTerm() {
1618 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1620 MockRaftActorContext leaderActorContext = createActorContext();
1621 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1622 new FiniteDuration(10000, TimeUnit.SECONDS));
1624 leaderActorContext.setReplicatedLog(
1625 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1627 leader = new Leader(leaderActorContext);
1628 leaderActor.underlyingActor().setBehavior(leader);
1629 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1631 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1632 AppendEntriesReply.class);
1634 assertEquals(false, appendEntriesReply.isSuccess());
1635 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1637 MessageCollectorActor.clearMessages(leaderActor);
1641 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1642 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1644 MockRaftActorContext leaderActorContext = createActorContext();
1645 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1646 new FiniteDuration(10000, TimeUnit.SECONDS));
1648 leaderActorContext.setReplicatedLog(
1649 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1650 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1652 leader = new Leader(leaderActorContext);
1653 leaderActor.underlyingActor().setBehavior(leader);
1654 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1656 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1657 AppendEntriesReply.class);
1659 assertEquals(false, appendEntriesReply.isSuccess());
1660 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1662 MessageCollectorActor.clearMessages(leaderActor);
1666 public void testHandleAppendEntriesReplySuccess() throws Exception {
1667 logStart("testHandleAppendEntriesReplySuccess");
1669 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1671 leaderActorContext.setReplicatedLog(
1672 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1674 leaderActorContext.setCommitIndex(1);
1675 leaderActorContext.setLastApplied(1);
1676 leaderActorContext.getTermInformation().update(1, "leader");
1678 leader = new Leader(leaderActorContext);
1680 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1682 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1683 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1685 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1687 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1689 assertEquals(RaftState.Leader, raftActorBehavior.state());
1691 assertEquals(2, leaderActorContext.getCommitIndex());
1693 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1694 leaderActor, ApplyJournalEntries.class);
1696 assertEquals(2, leaderActorContext.getLastApplied());
1698 assertEquals(2, applyJournalEntries.getToIndex());
1700 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1703 assertEquals(1,applyStateList.size());
1705 ApplyState applyState = applyStateList.get(0);
1707 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1709 assertEquals(2, followerInfo.getMatchIndex());
1710 assertEquals(3, followerInfo.getNextIndex());
1711 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1712 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1716 public void testHandleAppendEntriesReplyUnknownFollower() {
1717 logStart("testHandleAppendEntriesReplyUnknownFollower");
1719 MockRaftActorContext leaderActorContext = createActorContext();
1721 leader = new Leader(leaderActorContext);
1723 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1725 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1727 assertEquals(RaftState.Leader, raftActorBehavior.state());
1731 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1732 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1734 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1735 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1736 new FiniteDuration(1000, TimeUnit.SECONDS));
1737 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1739 leaderActorContext.setReplicatedLog(
1740 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1741 long leaderCommitIndex = 3;
1742 leaderActorContext.setCommitIndex(leaderCommitIndex);
1743 leaderActorContext.setLastApplied(leaderCommitIndex);
1745 final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1746 final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1747 final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1748 final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1750 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1752 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1753 followerActorContext.setCommitIndex(-1);
1754 followerActorContext.setLastApplied(-1);
1756 Follower follower = new Follower(followerActorContext);
1757 followerActor.underlyingActor().setBehavior(follower);
1758 followerActorContext.setCurrentBehavior(follower);
1760 leader = new Leader(leaderActorContext);
1762 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1763 final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1764 AppendEntriesReply.class);
1766 MessageCollectorActor.clearMessages(followerActor);
1767 MessageCollectorActor.clearMessages(leaderActor);
1769 // Verify initial AppendEntries sent with the leader's current commit index.
1770 assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1771 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1772 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1774 leaderActor.underlyingActor().setBehavior(leader);
1775 leaderActorContext.setCurrentBehavior(leader);
1777 leader.handleMessage(followerActor, appendEntriesReply);
1779 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1780 AppendEntries.class, 2);
1781 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1783 appendEntries = appendEntriesList.get(0);
1784 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1785 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1786 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1788 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1789 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1790 appendEntries.getEntries().get(0).getData());
1791 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1792 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1793 appendEntries.getEntries().get(1).getData());
1795 appendEntries = appendEntriesList.get(1);
1796 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1797 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1798 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1800 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1801 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1802 appendEntries.getEntries().get(0).getData());
1803 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1804 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1805 appendEntries.getEntries().get(1).getData());
1807 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1808 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1810 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1812 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1813 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1817 public void testHandleRequestVoteReply() {
1818 logStart("testHandleRequestVoteReply");
1820 MockRaftActorContext leaderActorContext = createActorContext();
1822 leader = new Leader(leaderActorContext);
1824 // Should be a no-op.
1825 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1826 new RequestVoteReply(1, true));
1828 assertEquals(RaftState.Leader, raftActorBehavior.state());
1830 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1832 assertEquals(RaftState.Leader, raftActorBehavior.state());
1836 public void testIsolatedLeaderCheckNoFollowers() {
1837 logStart("testIsolatedLeaderCheckNoFollowers");
1839 MockRaftActorContext leaderActorContext = createActorContext();
1841 leader = new Leader(leaderActorContext);
1842 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1843 assertTrue(newBehavior instanceof Leader);
1847 public void testIsolatedLeaderCheckNoVotingFollowers() {
1848 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1850 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1851 Follower follower = new Follower(followerActorContext);
1852 followerActor.underlyingActor().setBehavior(follower);
1854 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1855 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1856 new FiniteDuration(1000, TimeUnit.SECONDS));
1857 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1859 leader = new Leader(leaderActorContext);
1860 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1861 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1862 assertTrue("Expected Leader", newBehavior instanceof Leader);
1865 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1866 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1867 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1869 MockRaftActorContext leaderActorContext = createActorContext();
1871 Map<String, String> peerAddresses = new HashMap<>();
1872 peerAddresses.put("follower-1", followerActor1.path().toString());
1873 peerAddresses.put("follower-2", followerActor2.path().toString());
1875 leaderActorContext.setPeerAddresses(peerAddresses);
1876 leaderActorContext.setRaftPolicy(raftPolicy);
1878 leader = new Leader(leaderActorContext);
1880 leader.markFollowerActive("follower-1");
1881 leader.markFollowerActive("follower-2");
1882 RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1883 assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1885 // kill 1 follower and verify if that got killed
1886 final JavaTestKit probe = new JavaTestKit(getSystem());
1887 probe.watch(followerActor1);
1888 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1889 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1890 assertEquals(termMsg1.getActor(), followerActor1);
1892 leader.markFollowerInActive("follower-1");
1893 leader.markFollowerActive("follower-2");
1894 newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1895 assertTrue("Behavior not instance of Leader when majority of followers are active",
1896 newBehavior instanceof Leader);
1898 // kill 2nd follower and leader should change to Isolated leader
1899 followerActor2.tell(PoisonPill.getInstance(), null);
1900 probe.watch(followerActor2);
1901 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1902 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1903 assertEquals(termMsg2.getActor(), followerActor2);
1905 leader.markFollowerInActive("follower-2");
1906 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1910 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1911 logStart("testIsolatedLeaderCheckTwoFollowers");
1913 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1915 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1916 newBehavior instanceof IsolatedLeader);
1920 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1921 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1923 RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1925 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1926 newBehavior instanceof Leader);
1930 public void testLaggingFollowerStarvation() throws Exception {
1931 logStart("testLaggingFollowerStarvation");
1933 String leaderActorId = actorFactory.generateActorId("leader");
1934 String follower1ActorId = actorFactory.generateActorId("follower");
1935 String follower2ActorId = actorFactory.generateActorId("follower");
1937 final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1938 final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1940 MockRaftActorContext leaderActorContext =
1941 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1943 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1944 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1945 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1947 leaderActorContext.setConfigParams(configParams);
1949 leaderActorContext.setReplicatedLog(
1950 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1952 Map<String, String> peerAddresses = new HashMap<>();
1953 peerAddresses.put(follower1ActorId,
1954 follower1Actor.path().toString());
1955 peerAddresses.put(follower2ActorId,
1956 follower2Actor.path().toString());
1958 leaderActorContext.setPeerAddresses(peerAddresses);
1959 leaderActorContext.getTermInformation().update(1, leaderActorId);
1961 leader = createBehavior(leaderActorContext);
1963 leaderActor.underlyingActor().setBehavior(leader);
1965 for (int i = 1; i < 6; i++) {
1966 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1967 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1968 new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1969 assertTrue(newBehavior == leader);
1970 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1973 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1974 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1976 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1977 heartbeats.size() > 1);
1979 // Check if follower-2 got AppendEntries during this time and was not starved
1980 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1982 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1983 appendEntries.size() > 1);
1987 public void testReplicationConsensusWithNonVotingFollower() {
1988 logStart("testReplicationConsensusWithNonVotingFollower");
1990 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1991 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1992 new FiniteDuration(1000, TimeUnit.SECONDS));
1994 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1995 leaderActorContext.setCommitIndex(-1);
1996 leaderActorContext.setLastApplied(-1);
1998 String nonVotingFollowerId = "nonvoting-follower";
1999 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2000 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2002 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2003 VotingState.NON_VOTING);
2005 leader = new Leader(leaderActorContext);
2006 leaderActorContext.setCurrentBehavior(leader);
2008 // Ignore initial heartbeats
2009 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2010 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2012 MessageCollectorActor.clearMessages(followerActor);
2013 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2014 MessageCollectorActor.clearMessages(leaderActor);
2016 // Send a Replicate message and wait for AppendEntries.
2017 sendReplicate(leaderActorContext, 0);
2019 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2020 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2022 // Send reply only from the voting follower and verify consensus via ApplyState.
2023 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2025 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2027 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2029 MessageCollectorActor.clearMessages(followerActor);
2030 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2031 MessageCollectorActor.clearMessages(leaderActor);
2033 // Send another Replicate message
2034 sendReplicate(leaderActorContext, 1);
2036 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2037 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2038 AppendEntries.class);
2039 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2040 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2042 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2043 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2045 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2047 // Send reply from the voting follower and verify consensus.
2048 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2050 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2054 public void testTransferLeadershipWithFollowerInSync() {
2055 logStart("testTransferLeadershipWithFollowerInSync");
2057 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2058 leaderActorContext.setLastApplied(-1);
2059 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2060 new FiniteDuration(1000, TimeUnit.SECONDS));
2061 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2063 leader = new Leader(leaderActorContext);
2064 leaderActorContext.setCurrentBehavior(leader);
2066 // Initial heartbeat
2067 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2069 MessageCollectorActor.clearMessages(followerActor);
2071 sendReplicate(leaderActorContext, 0);
2072 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2075 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2076 MessageCollectorActor.clearMessages(followerActor);
2078 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2079 leader.transferLeadership(mockTransferCohort);
2081 verify(mockTransferCohort, never()).transferComplete();
2082 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2083 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2085 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2086 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2088 // Leader should force an election timeout
2089 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2091 verify(mockTransferCohort).transferComplete();
2095 public void testTransferLeadershipWithEmptyLog() {
2096 logStart("testTransferLeadershipWithEmptyLog");
2098 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2099 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2100 new FiniteDuration(1000, TimeUnit.SECONDS));
2101 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2103 leader = new Leader(leaderActorContext);
2104 leaderActorContext.setCurrentBehavior(leader);
2106 // Initial heartbeat
2107 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2108 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2109 MessageCollectorActor.clearMessages(followerActor);
2111 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2112 leader.transferLeadership(mockTransferCohort);
2114 verify(mockTransferCohort, never()).transferComplete();
2115 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2116 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2118 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2119 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2121 // Leader should force an election timeout
2122 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2124 verify(mockTransferCohort).transferComplete();
2128 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2129 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2131 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2132 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2133 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2135 leader = new Leader(leaderActorContext);
2136 leaderActorContext.setCurrentBehavior(leader);
2138 // Initial heartbeat
2139 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140 MessageCollectorActor.clearMessages(followerActor);
2142 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2143 leader.transferLeadership(mockTransferCohort);
2145 verify(mockTransferCohort, never()).transferComplete();
2147 // Sync up the follower.
2148 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2150 MessageCollectorActor.clearMessages(followerActor);
2152 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2153 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2154 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2155 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2156 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2158 // Leader should force an election timeout
2159 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2161 verify(mockTransferCohort).transferComplete();
2165 public void testTransferLeadershipWithFollowerSyncTimeout() {
2166 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2168 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2169 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2170 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2171 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2172 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2174 leader = new Leader(leaderActorContext);
2175 leaderActorContext.setCurrentBehavior(leader);
2177 // Initial heartbeat
2178 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2179 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2180 MessageCollectorActor.clearMessages(followerActor);
2182 sendReplicate(leaderActorContext, 0);
2183 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2185 MessageCollectorActor.clearMessages(followerActor);
2187 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2188 leader.transferLeadership(mockTransferCohort);
2190 verify(mockTransferCohort, never()).transferComplete();
2192 // Send heartbeats to time out the transfer.
2193 for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2194 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2195 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2196 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2199 verify(mockTransferCohort).abortTransfer();
2200 verify(mockTransferCohort, never()).transferComplete();
2201 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2205 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2206 ActorRef actorRef, RaftRPC rpc) throws Exception {
2207 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2208 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2211 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2213 private final long electionTimeOutIntervalMillis;
2214 private final int snapshotChunkSize;
2216 MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2218 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2219 this.snapshotChunkSize = snapshotChunkSize;
2223 public FiniteDuration getElectionTimeOutInterval() {
2224 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2228 public int getSnapshotChunkSize() {
2229 return snapshotChunkSize;