2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
65 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.yangtools.concepts.Identifier;
68 import scala.concurrent.duration.FiniteDuration;
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
72 static final String FOLLOWER_ID = "follower";
73 public static final String LEADER_ID = "leader";
75 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
78 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
81 private Leader leader;
82 private final short payloadVersion = 5;
86 public void tearDown() throws Exception {
95 public void testHandleMessageForUnknownMessage() throws Exception {
96 logStart("testHandleMessageForUnknownMessage");
98 leader = new Leader(createActorContext());
100 // handle message should null when it receives an unknown message
101 assertNull(leader.handleMessage(followerActor, "foo"));
105 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
108 MockRaftActorContext actorContext = createActorContextWithFollower();
109 actorContext.setCommitIndex(-1);
110 short payloadVersion = (short)5;
111 actorContext.setPayloadVersion(payloadVersion);
114 actorContext.getTermInformation().update(term, "");
116 leader = new Leader(actorContext);
117 actorContext.setCurrentBehavior(leader);
119 // Leader should send an immediate heartbeat with no entries as follower is inactive.
120 long lastIndex = actorContext.getReplicatedLog().lastIndex();
121 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
122 assertEquals("getTerm", term, appendEntries.getTerm());
123 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
124 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
125 assertEquals("Entries size", 0, appendEntries.getEntries().size());
126 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
128 // The follower would normally reply - simulate that explicitly here.
129 leader.handleMessage(followerActor, new AppendEntriesReply(
130 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
131 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
133 followerActor.underlyingActor().clear();
135 // Sleep for the heartbeat interval so AppendEntries is sent.
136 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
137 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
139 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
141 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
142 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
143 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
144 assertEquals("Entries size", 1, appendEntries.getEntries().size());
145 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
146 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
147 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
151 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
152 return sendReplicate(actorContext, 1, index);
155 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
156 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
157 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
158 term, index, payload);
159 actorContext.getReplicatedLog().append(newEntry);
160 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
164 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
165 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
167 MockRaftActorContext actorContext = createActorContextWithFollower();
170 actorContext.getTermInformation().update(term, "");
172 leader = new Leader(actorContext);
174 // Leader will send an immediate heartbeat - ignore it.
175 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177 // The follower would normally reply - simulate that explicitly here.
178 long lastIndex = actorContext.getReplicatedLog().lastIndex();
179 leader.handleMessage(followerActor, new AppendEntriesReply(
180 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
181 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
183 followerActor.underlyingActor().clear();
185 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
187 // State should not change
188 assertTrue(raftBehavior instanceof Leader);
190 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
192 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
193 assertEquals("Entries size", 1, appendEntries.getEntries().size());
194 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
195 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
196 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
197 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
201 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
202 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
204 MockRaftActorContext actorContext = createActorContextWithFollower();
205 actorContext.setCommitIndex(-1);
207 // The raft context is initialized with a couple log entries. However the commitIndex
208 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
209 // committed and applied. Now it regains leadership with a higher term (2).
210 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
211 long newTerm = prevTerm + 1;
212 actorContext.getTermInformation().update(newTerm, "");
214 leader = new Leader(actorContext);
215 actorContext.setCurrentBehavior(leader);
217 // Leader will send an immediate heartbeat - ignore it.
218 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
220 // The follower replies with the leader's current last index and term, simulating that it is
221 // up to date with the leader.
222 long lastIndex = actorContext.getReplicatedLog().lastIndex();
223 leader.handleMessage(followerActor, new AppendEntriesReply(
224 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
226 // The commit index should not get updated even though consensus was reached. This is b/c the
227 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
228 // from previous terms by counting replicas".
229 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
231 followerActor.underlyingActor().clear();
233 // Now replicate a new entry with the new term 2.
234 long newIndex = lastIndex + 1;
235 sendReplicate(actorContext, newTerm, newIndex);
237 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
238 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
239 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
240 assertEquals("Entries size", 1, appendEntries.getEntries().size());
241 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
242 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
243 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
245 // The follower replies with success. The leader should now update the commit index to the new index
246 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
247 // prior entries are committed indirectly".
248 leader.handleMessage(followerActor, new AppendEntriesReply(
249 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
251 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
255 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
256 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
258 MockRaftActorContext actorContext = createActorContextWithFollower();
259 actorContext.setRaftPolicy(createRaftPolicy(true, true));
262 actorContext.getTermInformation().update(term, "");
264 leader = new Leader(actorContext);
266 // Leader will send an immediate heartbeat - ignore it.
267 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
269 // The follower would normally reply - simulate that explicitly here.
270 long lastIndex = actorContext.getReplicatedLog().lastIndex();
271 leader.handleMessage(followerActor, new AppendEntriesReply(
272 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
273 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
275 followerActor.underlyingActor().clear();
277 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
279 // State should not change
280 assertTrue(raftBehavior instanceof Leader);
282 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
284 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
285 assertEquals("Entries size", 1, appendEntries.getEntries().size());
286 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
287 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
288 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
289 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
293 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
294 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
296 MockRaftActorContext actorContext = createActorContextWithFollower();
297 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
299 public FiniteDuration getHeartBeatInterval() {
300 return FiniteDuration.apply(5, TimeUnit.SECONDS);
305 actorContext.getTermInformation().update(term, "");
307 leader = new Leader(actorContext);
309 // Leader will send an immediate heartbeat - ignore it.
310 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
312 // The follower would normally reply - simulate that explicitly here.
313 long lastIndex = actorContext.getReplicatedLog().lastIndex();
314 leader.handleMessage(followerActor, new AppendEntriesReply(
315 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
316 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
318 followerActor.underlyingActor().clear();
320 for(int i=0;i<5;i++) {
321 sendReplicate(actorContext, lastIndex+i+1);
324 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
325 // We expect only 1 message to be sent because of two reasons,
326 // - an append entries reply was not received
327 // - the heartbeat interval has not expired
328 // In this scenario if multiple messages are sent they would likely be duplicates
329 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
333 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
334 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
336 MockRaftActorContext actorContext = createActorContextWithFollower();
337 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
339 public FiniteDuration getHeartBeatInterval() {
340 return FiniteDuration.apply(5, TimeUnit.SECONDS);
345 actorContext.getTermInformation().update(term, "");
347 leader = new Leader(actorContext);
349 // Leader will send an immediate heartbeat - ignore it.
350 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
352 // The follower would normally reply - simulate that explicitly here.
353 long lastIndex = actorContext.getReplicatedLog().lastIndex();
354 leader.handleMessage(followerActor, new AppendEntriesReply(
355 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
356 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
358 followerActor.underlyingActor().clear();
360 for(int i=0;i<3;i++) {
361 sendReplicate(actorContext, lastIndex+i+1);
362 leader.handleMessage(followerActor, new AppendEntriesReply(
363 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
367 for(int i=3;i<5;i++) {
368 sendReplicate(actorContext, lastIndex + i + 1);
371 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
372 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
373 // get sent to the follower - but not the 5th
374 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
376 for(int i=0;i<4;i++) {
377 long expected = allMessages.get(i).getEntries().get(0).getIndex();
378 assertEquals(expected, i+2);
383 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
384 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
386 MockRaftActorContext actorContext = createActorContextWithFollower();
387 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
389 public FiniteDuration getHeartBeatInterval() {
390 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
395 actorContext.getTermInformation().update(term, "");
397 leader = new Leader(actorContext);
399 // Leader will send an immediate heartbeat - ignore it.
400 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
402 // The follower would normally reply - simulate that explicitly here.
403 long lastIndex = actorContext.getReplicatedLog().lastIndex();
404 leader.handleMessage(followerActor, new AppendEntriesReply(
405 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
406 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
408 followerActor.underlyingActor().clear();
410 sendReplicate(actorContext, lastIndex+1);
412 // Wait slightly longer than heartbeat duration
413 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
415 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
417 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
418 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
420 assertEquals(1, allMessages.get(0).getEntries().size());
421 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422 assertEquals(1, allMessages.get(1).getEntries().size());
423 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
428 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
429 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
431 MockRaftActorContext actorContext = createActorContextWithFollower();
432 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
434 public FiniteDuration getHeartBeatInterval() {
435 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
440 actorContext.getTermInformation().update(term, "");
442 leader = new Leader(actorContext);
444 // Leader will send an immediate heartbeat - ignore it.
445 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
447 // The follower would normally reply - simulate that explicitly here.
448 long lastIndex = actorContext.getReplicatedLog().lastIndex();
449 leader.handleMessage(followerActor, new AppendEntriesReply(
450 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
451 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
453 followerActor.underlyingActor().clear();
455 for(int i=0;i<3;i++) {
456 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
457 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
460 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
461 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
465 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
466 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
468 MockRaftActorContext actorContext = createActorContextWithFollower();
469 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
471 public FiniteDuration getHeartBeatInterval() {
472 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
477 actorContext.getTermInformation().update(term, "");
479 leader = new Leader(actorContext);
481 // Leader will send an immediate heartbeat - ignore it.
482 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484 // The follower would normally reply - simulate that explicitly here.
485 long lastIndex = actorContext.getReplicatedLog().lastIndex();
486 leader.handleMessage(followerActor, new AppendEntriesReply(
487 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
490 followerActor.underlyingActor().clear();
492 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
493 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494 sendReplicate(actorContext, lastIndex+1);
496 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
497 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
499 assertEquals(0, allMessages.get(0).getEntries().size());
500 assertEquals(1, allMessages.get(1).getEntries().size());
505 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
506 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
508 MockRaftActorContext actorContext = createActorContext();
510 leader = new Leader(actorContext);
512 actorContext.setLastApplied(0);
514 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
515 long term = actorContext.getTermInformation().getCurrentTerm();
516 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
517 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
519 actorContext.getReplicatedLog().append(newEntry);
521 final Identifier id = new MockIdentifier("state-id");
522 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
524 // State should not change
525 assertTrue(raftBehavior instanceof Leader);
527 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
529 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
530 // one since lastApplied state is 0.
531 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
532 leaderActor, ApplyState.class);
533 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
535 for(int i = 0; i <= newLogIndex - 1; i++ ) {
536 ApplyState applyState = applyStateList.get(i);
537 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
538 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
541 ApplyState last = applyStateList.get((int) newLogIndex - 1);
542 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
543 assertEquals("getIdentifier", id, last.getIdentifier());
547 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
548 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
550 MockRaftActorContext actorContext = createActorContextWithFollower();
552 Map<String, String> leadersSnapshot = new HashMap<>();
553 leadersSnapshot.put("1", "A");
554 leadersSnapshot.put("2", "B");
555 leadersSnapshot.put("3", "C");
558 actorContext.getReplicatedLog().removeFrom(0);
560 final int commitIndex = 3;
561 final int snapshotIndex = 2;
562 final int newEntryIndex = 4;
563 final int snapshotTerm = 1;
564 final int currentTerm = 2;
566 // set the snapshot variables in replicatedlog
567 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569 actorContext.setCommitIndex(commitIndex);
570 //set follower timeout to 2 mins, helps during debugging
571 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
573 leader = new Leader(actorContext);
575 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
576 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
579 ReplicatedLogImplEntry entry =
580 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
581 new MockRaftActorContext.MockPayload("D"));
583 //update follower timestamp
584 leader.markFollowerActive(FOLLOWER_ID);
586 ByteString bs = toByteString(leadersSnapshot);
587 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
588 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
589 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
590 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
592 //send first chunk and no InstallSnapshotReply received yet
594 fts.incrementChunkIndex();
596 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
597 TimeUnit.MILLISECONDS);
599 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
601 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
603 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
605 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
607 //InstallSnapshotReply received
608 fts.markSendStatus(true);
610 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
612 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
614 assertEquals(commitIndex, is.getLastIncludedIndex());
618 public void testSendAppendEntriesSnapshotScenario() throws Exception {
619 logStart("testSendAppendEntriesSnapshotScenario");
621 MockRaftActorContext actorContext = createActorContextWithFollower();
623 Map<String, String> leadersSnapshot = new HashMap<>();
624 leadersSnapshot.put("1", "A");
625 leadersSnapshot.put("2", "B");
626 leadersSnapshot.put("3", "C");
629 actorContext.getReplicatedLog().removeFrom(0);
631 final int followersLastIndex = 2;
632 final int snapshotIndex = 3;
633 final int newEntryIndex = 4;
634 final int snapshotTerm = 1;
635 final int currentTerm = 2;
637 // set the snapshot variables in replicatedlog
638 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640 actorContext.setCommitIndex(followersLastIndex);
642 leader = new Leader(actorContext);
644 // Leader will send an immediate heartbeat - ignore it.
645 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
648 ReplicatedLogImplEntry entry =
649 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
650 new MockRaftActorContext.MockPayload("D"));
652 actorContext.getReplicatedLog().append(entry);
654 //update follower timestamp
655 leader.markFollowerActive(FOLLOWER_ID);
657 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
658 RaftActorBehavior raftBehavior = leader.handleMessage(
659 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
661 assertTrue(raftBehavior instanceof Leader);
663 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
667 public void testInitiateInstallSnapshot() throws Exception {
668 logStart("testInitiateInstallSnapshot");
670 MockRaftActorContext actorContext = createActorContextWithFollower();
673 actorContext.getReplicatedLog().removeFrom(0);
675 final int followersLastIndex = 2;
676 final int snapshotIndex = 3;
677 final int newEntryIndex = 4;
678 final int snapshotTerm = 1;
679 final int currentTerm = 2;
681 // set the snapshot variables in replicatedlog
682 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
683 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
684 actorContext.setLastApplied(3);
685 actorContext.setCommitIndex(followersLastIndex);
687 leader = new Leader(actorContext);
689 // Leader will send an immediate heartbeat - ignore it.
690 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
692 // set the snapshot as absent and check if capture-snapshot is invoked.
693 leader.setSnapshot(null);
696 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
697 new MockRaftActorContext.MockPayload("D"));
699 actorContext.getReplicatedLog().append(entry);
701 //update follower timestamp
702 leader.markFollowerActive(FOLLOWER_ID);
704 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
706 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
708 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
710 assertTrue(cs.isInstallSnapshotInitiated());
711 assertEquals(3, cs.getLastAppliedIndex());
712 assertEquals(1, cs.getLastAppliedTerm());
713 assertEquals(4, cs.getLastIndex());
714 assertEquals(2, cs.getLastTerm());
716 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
717 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
719 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
723 public void testInitiateForceInstallSnapshot() throws Exception {
724 logStart("testInitiateForceInstallSnapshot");
726 MockRaftActorContext actorContext = createActorContextWithFollower();
728 final int followersLastIndex = 2;
729 final int snapshotIndex = -1;
730 final int newEntryIndex = 4;
731 final int snapshotTerm = -1;
732 final int currentTerm = 2;
734 // set the snapshot variables in replicatedlog
735 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
736 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
737 actorContext.setLastApplied(3);
738 actorContext.setCommitIndex(followersLastIndex);
740 actorContext.getReplicatedLog().removeFrom(0);
742 leader = new Leader(actorContext);
744 // Leader will send an immediate heartbeat - ignore it.
745 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
747 // set the snapshot as absent and check if capture-snapshot is invoked.
748 leader.setSnapshot(null);
750 for(int i=0;i<4;i++) {
751 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
752 new MockRaftActorContext.MockPayload("X" + i)));
756 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
757 new MockRaftActorContext.MockPayload("D"));
759 actorContext.getReplicatedLog().append(entry);
761 //update follower timestamp
762 leader.markFollowerActive(FOLLOWER_ID);
764 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
765 // installed with a SendInstallSnapshot
766 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
768 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
770 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
772 assertTrue(cs.isInstallSnapshotInitiated());
773 assertEquals(3, cs.getLastAppliedIndex());
774 assertEquals(1, cs.getLastAppliedTerm());
775 assertEquals(4, cs.getLastIndex());
776 assertEquals(2, cs.getLastTerm());
778 // if an initiate is started again when first is in progress, it should not initiate Capture
779 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
781 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
786 public void testInstallSnapshot() throws Exception {
787 logStart("testInstallSnapshot");
789 MockRaftActorContext actorContext = createActorContextWithFollower();
791 Map<String, String> leadersSnapshot = new HashMap<>();
792 leadersSnapshot.put("1", "A");
793 leadersSnapshot.put("2", "B");
794 leadersSnapshot.put("3", "C");
797 actorContext.getReplicatedLog().removeFrom(0);
799 final int lastAppliedIndex = 3;
800 final int snapshotIndex = 2;
801 final int snapshotTerm = 1;
802 final int currentTerm = 2;
804 // set the snapshot variables in replicatedlog
805 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
806 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
807 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
808 actorContext.setCommitIndex(lastAppliedIndex);
809 actorContext.setLastApplied(lastAppliedIndex);
811 leader = new Leader(actorContext);
813 // Initial heartbeat.
814 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
816 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
817 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
819 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
820 Collections.<ReplicatedLogEntry>emptyList(),
821 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
823 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
825 assertTrue(raftBehavior instanceof Leader);
827 // check if installsnapshot gets called with the correct values.
829 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
831 assertNotNull(installSnapshot.getData());
832 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
833 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
835 assertEquals(currentTerm, installSnapshot.getTerm());
839 public void testForceInstallSnapshot() throws Exception {
840 logStart("testForceInstallSnapshot");
842 MockRaftActorContext actorContext = createActorContextWithFollower();
844 Map<String, String> leadersSnapshot = new HashMap<>();
845 leadersSnapshot.put("1", "A");
846 leadersSnapshot.put("2", "B");
847 leadersSnapshot.put("3", "C");
849 final int lastAppliedIndex = 3;
850 final int snapshotIndex = -1;
851 final int snapshotTerm = -1;
852 final int currentTerm = 2;
854 // set the snapshot variables in replicatedlog
855 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
856 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
857 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
858 actorContext.setCommitIndex(lastAppliedIndex);
859 actorContext.setLastApplied(lastAppliedIndex);
861 leader = new Leader(actorContext);
863 // Initial heartbeat.
864 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
866 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
867 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
869 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
870 Collections.<ReplicatedLogEntry>emptyList(),
871 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
873 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
875 assertTrue(raftBehavior instanceof Leader);
877 // check if installsnapshot gets called with the correct values.
879 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
881 assertNotNull(installSnapshot.getData());
882 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
883 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885 assertEquals(currentTerm, installSnapshot.getTerm());
889 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
890 logStart("testHandleInstallSnapshotReplyLastChunk");
892 MockRaftActorContext actorContext = createActorContextWithFollower();
894 final int commitIndex = 3;
895 final int snapshotIndex = 2;
896 final int snapshotTerm = 1;
897 final int currentTerm = 2;
899 actorContext.setCommitIndex(commitIndex);
901 leader = new Leader(actorContext);
902 actorContext.setCurrentBehavior(leader);
904 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
905 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
907 // Ignore initial heartbeat.
908 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
910 Map<String, String> leadersSnapshot = new HashMap<>();
911 leadersSnapshot.put("1", "A");
912 leadersSnapshot.put("2", "B");
913 leadersSnapshot.put("3", "C");
915 // set the snapshot variables in replicatedlog
917 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
918 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
919 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
921 ByteString bs = toByteString(leadersSnapshot);
922 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
923 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
924 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
925 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
926 while(!fts.isLastChunk(fts.getChunkIndex())) {
928 fts.incrementChunkIndex();
932 actorContext.getReplicatedLog().removeFrom(0);
934 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
935 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
937 assertTrue(raftBehavior instanceof Leader);
939 assertEquals(0, leader.followerSnapshotSize());
940 assertEquals(1, leader.followerLogSize());
941 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
943 assertEquals(commitIndex, fli.getMatchIndex());
944 assertEquals(commitIndex + 1, fli.getNextIndex());
948 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
949 logStart("testSendSnapshotfromInstallSnapshotReply");
951 MockRaftActorContext actorContext = createActorContextWithFollower();
953 final int commitIndex = 3;
954 final int snapshotIndex = 2;
955 final int snapshotTerm = 1;
956 final int currentTerm = 2;
958 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
960 public int getSnapshotChunkSize() {
964 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
965 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
967 actorContext.setConfigParams(configParams);
968 actorContext.setCommitIndex(commitIndex);
970 leader = new Leader(actorContext);
971 actorContext.setCurrentBehavior(leader);
973 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
974 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
976 Map<String, String> leadersSnapshot = new HashMap<>();
977 leadersSnapshot.put("1", "A");
978 leadersSnapshot.put("2", "B");
979 leadersSnapshot.put("3", "C");
981 // set the snapshot variables in replicatedlog
982 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
983 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
984 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
986 ByteString bs = toByteString(leadersSnapshot);
987 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
988 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
989 leader.setSnapshot(snapshot);
991 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
993 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
995 assertEquals(1, installSnapshot.getChunkIndex());
996 assertEquals(3, installSnapshot.getTotalChunks());
998 followerActor.underlyingActor().clear();
999 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1000 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1002 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1004 assertEquals(2, installSnapshot.getChunkIndex());
1005 assertEquals(3, installSnapshot.getTotalChunks());
1007 followerActor.underlyingActor().clear();
1008 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1011 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1013 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1014 followerActor.underlyingActor().clear();
1015 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1016 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1018 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1020 assertNull(installSnapshot);
1025 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1026 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1028 MockRaftActorContext actorContext = createActorContextWithFollower();
1030 final int commitIndex = 3;
1031 final int snapshotIndex = 2;
1032 final int snapshotTerm = 1;
1033 final int currentTerm = 2;
1035 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1037 public int getSnapshotChunkSize() {
1042 actorContext.setCommitIndex(commitIndex);
1044 leader = new Leader(actorContext);
1046 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1049 Map<String, String> leadersSnapshot = new HashMap<>();
1050 leadersSnapshot.put("1", "A");
1051 leadersSnapshot.put("2", "B");
1052 leadersSnapshot.put("3", "C");
1054 // set the snapshot variables in replicatedlog
1055 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1059 ByteString bs = toByteString(leadersSnapshot);
1060 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062 leader.setSnapshot(snapshot);
1064 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1065 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1067 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 assertEquals(1, installSnapshot.getChunkIndex());
1070 assertEquals(3, installSnapshot.getTotalChunks());
1072 followerActor.underlyingActor().clear();
1074 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075 FOLLOWER_ID, -1, false));
1077 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1078 TimeUnit.MILLISECONDS);
1080 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1082 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1084 assertEquals(1, installSnapshot.getChunkIndex());
1085 assertEquals(3, installSnapshot.getTotalChunks());
1089 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1090 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1092 MockRaftActorContext actorContext = createActorContextWithFollower();
1094 final int commitIndex = 3;
1095 final int snapshotIndex = 2;
1096 final int snapshotTerm = 1;
1097 final int currentTerm = 2;
1099 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1101 public int getSnapshotChunkSize() {
1106 actorContext.setCommitIndex(commitIndex);
1108 leader = new Leader(actorContext);
1110 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1111 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1113 Map<String, String> leadersSnapshot = new HashMap<>();
1114 leadersSnapshot.put("1", "A");
1115 leadersSnapshot.put("2", "B");
1116 leadersSnapshot.put("3", "C");
1118 // set the snapshot variables in replicatedlog
1119 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1120 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1121 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1123 ByteString bs = toByteString(leadersSnapshot);
1124 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1125 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1126 leader.setSnapshot(snapshot);
1128 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1130 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1132 assertEquals(1, installSnapshot.getChunkIndex());
1133 assertEquals(3, installSnapshot.getTotalChunks());
1134 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1136 int hashCode = Arrays.hashCode(installSnapshot.getData());
1138 followerActor.underlyingActor().clear();
1140 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1141 FOLLOWER_ID, 1, true));
1143 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1145 assertEquals(2, installSnapshot.getChunkIndex());
1146 assertEquals(3, installSnapshot.getTotalChunks());
1147 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1151 public void testFollowerToSnapshotLogic() {
1152 logStart("testFollowerToSnapshotLogic");
1154 MockRaftActorContext actorContext = createActorContext();
1156 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1158 public int getSnapshotChunkSize() {
1163 leader = new Leader(actorContext);
1165 Map<String, String> leadersSnapshot = new HashMap<>();
1166 leadersSnapshot.put("1", "A");
1167 leadersSnapshot.put("2", "B");
1168 leadersSnapshot.put("3", "C");
1170 ByteString bs = toByteString(leadersSnapshot);
1171 byte[] barray = bs.toByteArray();
1173 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1174 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1176 assertEquals(bs.size(), barray.length);
1179 for (int i=0; i < barray.length; i = i + 50) {
1183 if (i + 50 > barray.length) {
1187 byte[] chunk = fts.getNextChunk();
1188 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1189 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1191 fts.markSendStatus(true);
1192 if (!fts.isLastChunk(chunkIndex)) {
1193 fts.incrementChunkIndex();
1197 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1201 protected Leader createBehavior(final RaftActorContext actorContext) {
1202 return new Leader(actorContext);
1206 protected MockRaftActorContext createActorContext() {
1207 return createActorContext(leaderActor);
1211 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1212 return createActorContext(LEADER_ID, actorRef);
1215 private MockRaftActorContext createActorContextWithFollower() {
1216 MockRaftActorContext actorContext = createActorContext();
1217 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1218 followerActor.path().toString()).build());
1219 return actorContext;
1222 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1223 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1224 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1225 configParams.setElectionTimeoutFactor(100000);
1226 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1227 context.setConfigParams(configParams);
1228 context.setPayloadVersion(payloadVersion);
1232 private MockRaftActorContext createFollowerActorContextWithLeader() {
1233 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1234 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1235 followerConfig.setElectionTimeoutFactor(10000);
1236 followerActorContext.setConfigParams(followerConfig);
1237 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1238 return followerActorContext;
1242 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1243 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1245 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1247 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1249 Follower follower = new Follower(followerActorContext);
1250 followerActor.underlyingActor().setBehavior(follower);
1251 followerActorContext.setCurrentBehavior(follower);
1253 Map<String, String> peerAddresses = new HashMap<>();
1254 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1256 leaderActorContext.setPeerAddresses(peerAddresses);
1258 leaderActorContext.getReplicatedLog().removeFrom(0);
1261 leaderActorContext.setReplicatedLog(
1262 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264 leaderActorContext.setCommitIndex(1);
1266 followerActorContext.getReplicatedLog().removeFrom(0);
1268 // follower too has the exact same log entries and has the same commit index
1269 followerActorContext.setReplicatedLog(
1270 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1272 followerActorContext.setCommitIndex(1);
1274 leader = new Leader(leaderActorContext);
1275 leaderActorContext.setCurrentBehavior(leader);
1277 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1279 assertEquals(1, appendEntries.getLeaderCommit());
1280 assertEquals(0, appendEntries.getEntries().size());
1281 assertEquals(0, appendEntries.getPrevLogIndex());
1283 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1284 leaderActor, AppendEntriesReply.class);
1286 assertEquals(2, appendEntriesReply.getLogLastIndex());
1287 assertEquals(1, appendEntriesReply.getLogLastTerm());
1289 // follower returns its next index
1290 assertEquals(2, appendEntriesReply.getLogLastIndex());
1291 assertEquals(1, appendEntriesReply.getLogLastTerm());
1297 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1298 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1300 MockRaftActorContext leaderActorContext = createActorContext();
1302 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1303 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1305 Follower follower = new Follower(followerActorContext);
1306 followerActor.underlyingActor().setBehavior(follower);
1307 followerActorContext.setCurrentBehavior(follower);
1309 Map<String, String> leaderPeerAddresses = new HashMap<>();
1310 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1312 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1314 leaderActorContext.getReplicatedLog().removeFrom(0);
1316 leaderActorContext.setReplicatedLog(
1317 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1319 leaderActorContext.setCommitIndex(1);
1321 followerActorContext.getReplicatedLog().removeFrom(0);
1323 followerActorContext.setReplicatedLog(
1324 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1326 // follower has the same log entries but its commit index > leaders commit index
1327 followerActorContext.setCommitIndex(2);
1329 leader = new Leader(leaderActorContext);
1331 // Initial heartbeat
1332 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334 assertEquals(1, appendEntries.getLeaderCommit());
1335 assertEquals(0, appendEntries.getEntries().size());
1336 assertEquals(0, appendEntries.getPrevLogIndex());
1338 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1339 leaderActor, AppendEntriesReply.class);
1341 assertEquals(2, appendEntriesReply.getLogLastIndex());
1342 assertEquals(1, appendEntriesReply.getLogLastTerm());
1344 leaderActor.underlyingActor().setBehavior(follower);
1345 leader.handleMessage(followerActor, appendEntriesReply);
1347 leaderActor.underlyingActor().clear();
1348 followerActor.underlyingActor().clear();
1350 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1351 TimeUnit.MILLISECONDS);
1353 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1355 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1357 assertEquals(2, appendEntries.getLeaderCommit());
1358 assertEquals(0, appendEntries.getEntries().size());
1359 assertEquals(2, appendEntries.getPrevLogIndex());
1361 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1363 assertEquals(2, appendEntriesReply.getLogLastIndex());
1364 assertEquals(1, appendEntriesReply.getLogLastTerm());
1366 assertEquals(2, followerActorContext.getCommitIndex());
1372 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1373 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1375 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1376 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1377 new FiniteDuration(1000, TimeUnit.SECONDS));
1379 leaderActorContext.setReplicatedLog(
1380 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1381 long leaderCommitIndex = 2;
1382 leaderActorContext.setCommitIndex(leaderCommitIndex);
1383 leaderActorContext.setLastApplied(leaderCommitIndex);
1385 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1386 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1388 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1390 followerActorContext.setReplicatedLog(
1391 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1392 followerActorContext.setCommitIndex(0);
1393 followerActorContext.setLastApplied(0);
1395 Follower follower = new Follower(followerActorContext);
1396 followerActor.underlyingActor().setBehavior(follower);
1398 leader = new Leader(leaderActorContext);
1400 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1401 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1403 MessageCollectorActor.clearMessages(followerActor);
1404 MessageCollectorActor.clearMessages(leaderActor);
1406 // Verify initial AppendEntries sent with the leader's current commit index.
1407 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1409 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1411 leaderActor.underlyingActor().setBehavior(leader);
1413 leader.handleMessage(followerActor, appendEntriesReply);
1415 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1416 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1418 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1420 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1422 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1423 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1424 appendEntries.getEntries().get(0).getData());
1425 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1426 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1427 appendEntries.getEntries().get(1).getData());
1429 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1430 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1432 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1434 ApplyState applyState = applyStateList.get(0);
1435 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1436 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1438 applyState.getReplicatedLogEntry().getData());
1440 applyState = applyStateList.get(1);
1441 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1442 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1444 applyState.getReplicatedLogEntry().getData());
1446 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1447 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1451 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1452 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1454 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1455 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1456 new FiniteDuration(1000, TimeUnit.SECONDS));
1458 leaderActorContext.setReplicatedLog(
1459 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1460 long leaderCommitIndex = 1;
1461 leaderActorContext.setCommitIndex(leaderCommitIndex);
1462 leaderActorContext.setLastApplied(leaderCommitIndex);
1464 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1465 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1467 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1469 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1470 followerActorContext.setCommitIndex(-1);
1471 followerActorContext.setLastApplied(-1);
1473 Follower follower = new Follower(followerActorContext);
1474 followerActor.underlyingActor().setBehavior(follower);
1475 followerActorContext.setCurrentBehavior(follower);
1477 leader = new Leader(leaderActorContext);
1479 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1480 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1482 MessageCollectorActor.clearMessages(followerActor);
1483 MessageCollectorActor.clearMessages(leaderActor);
1485 // Verify initial AppendEntries sent with the leader's current commit index.
1486 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1487 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1488 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1490 leaderActor.underlyingActor().setBehavior(leader);
1491 leaderActorContext.setCurrentBehavior(leader);
1493 leader.handleMessage(followerActor, appendEntriesReply);
1495 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1496 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1498 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1499 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1500 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1502 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1503 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1504 appendEntries.getEntries().get(0).getData());
1505 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1506 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1507 appendEntries.getEntries().get(1).getData());
1509 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1510 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1512 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1514 ApplyState applyState = applyStateList.get(0);
1515 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1516 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1518 applyState.getReplicatedLogEntry().getData());
1520 applyState = applyStateList.get(1);
1521 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1522 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1523 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1524 applyState.getReplicatedLogEntry().getData());
1526 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1527 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1531 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1532 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1534 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1535 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1536 new FiniteDuration(1000, TimeUnit.SECONDS));
1538 leaderActorContext.setReplicatedLog(
1539 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1540 long leaderCommitIndex = 1;
1541 leaderActorContext.setCommitIndex(leaderCommitIndex);
1542 leaderActorContext.setLastApplied(leaderCommitIndex);
1544 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1545 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1547 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1549 followerActorContext.setReplicatedLog(
1550 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1551 followerActorContext.setCommitIndex(-1);
1552 followerActorContext.setLastApplied(-1);
1554 Follower follower = new Follower(followerActorContext);
1555 followerActor.underlyingActor().setBehavior(follower);
1556 followerActorContext.setCurrentBehavior(follower);
1558 leader = new Leader(leaderActorContext);
1560 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1561 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1563 MessageCollectorActor.clearMessages(followerActor);
1564 MessageCollectorActor.clearMessages(leaderActor);
1566 // Verify initial AppendEntries sent with the leader's current commit index.
1567 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1568 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1569 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1571 leaderActor.underlyingActor().setBehavior(leader);
1572 leaderActorContext.setCurrentBehavior(leader);
1574 leader.handleMessage(followerActor, appendEntriesReply);
1576 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1577 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1579 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1580 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1581 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1583 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1584 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1585 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1586 appendEntries.getEntries().get(0).getData());
1587 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1588 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1589 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1590 appendEntries.getEntries().get(1).getData());
1592 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1593 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1595 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1597 ApplyState applyState = applyStateList.get(0);
1598 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1599 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1601 applyState.getReplicatedLogEntry().getData());
1603 applyState = applyStateList.get(1);
1604 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1605 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1606 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1607 applyState.getReplicatedLogEntry().getData());
1609 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1610 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1611 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1615 public void testHandleAppendEntriesReplyWithNewerTerm(){
1616 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1618 MockRaftActorContext leaderActorContext = createActorContext();
1619 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1620 new FiniteDuration(10000, TimeUnit.SECONDS));
1622 leaderActorContext.setReplicatedLog(
1623 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1625 leader = new Leader(leaderActorContext);
1626 leaderActor.underlyingActor().setBehavior(leader);
1627 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1629 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1631 assertEquals(false, appendEntriesReply.isSuccess());
1632 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1634 MessageCollectorActor.clearMessages(leaderActor);
1638 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1639 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1641 MockRaftActorContext leaderActorContext = createActorContext();
1642 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1643 new FiniteDuration(10000, TimeUnit.SECONDS));
1645 leaderActorContext.setReplicatedLog(
1646 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1647 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1649 leader = new Leader(leaderActorContext);
1650 leaderActor.underlyingActor().setBehavior(leader);
1651 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1653 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1655 assertEquals(false, appendEntriesReply.isSuccess());
1656 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1658 MessageCollectorActor.clearMessages(leaderActor);
1662 public void testHandleAppendEntriesReplySuccess() throws Exception {
1663 logStart("testHandleAppendEntriesReplySuccess");
1665 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1667 leaderActorContext.setReplicatedLog(
1668 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1670 leaderActorContext.setCommitIndex(1);
1671 leaderActorContext.setLastApplied(1);
1672 leaderActorContext.getTermInformation().update(1, "leader");
1674 leader = new Leader(leaderActorContext);
1676 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1678 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1679 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1681 short payloadVersion = 5;
1682 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1684 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1686 assertEquals(RaftState.Leader, raftActorBehavior.state());
1688 assertEquals(2, leaderActorContext.getCommitIndex());
1690 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1691 leaderActor, ApplyJournalEntries.class);
1693 assertEquals(2, leaderActorContext.getLastApplied());
1695 assertEquals(2, applyJournalEntries.getToIndex());
1697 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1700 assertEquals(1,applyStateList.size());
1702 ApplyState applyState = applyStateList.get(0);
1704 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1706 assertEquals(2, followerInfo.getMatchIndex());
1707 assertEquals(3, followerInfo.getNextIndex());
1708 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1709 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1713 public void testHandleAppendEntriesReplyUnknownFollower(){
1714 logStart("testHandleAppendEntriesReplyUnknownFollower");
1716 MockRaftActorContext leaderActorContext = createActorContext();
1718 leader = new Leader(leaderActorContext);
1720 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1722 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1724 assertEquals(RaftState.Leader, raftActorBehavior.state());
1728 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1729 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1731 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1732 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1733 new FiniteDuration(1000, TimeUnit.SECONDS));
1734 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1736 leaderActorContext.setReplicatedLog(
1737 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1738 long leaderCommitIndex = 3;
1739 leaderActorContext.setCommitIndex(leaderCommitIndex);
1740 leaderActorContext.setLastApplied(leaderCommitIndex);
1742 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1743 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1744 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1745 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1747 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1749 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1750 followerActorContext.setCommitIndex(-1);
1751 followerActorContext.setLastApplied(-1);
1753 Follower follower = new Follower(followerActorContext);
1754 followerActor.underlyingActor().setBehavior(follower);
1755 followerActorContext.setCurrentBehavior(follower);
1757 leader = new Leader(leaderActorContext);
1759 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1760 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1762 MessageCollectorActor.clearMessages(followerActor);
1763 MessageCollectorActor.clearMessages(leaderActor);
1765 // Verify initial AppendEntries sent with the leader's current commit index.
1766 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1767 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1768 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1770 leaderActor.underlyingActor().setBehavior(leader);
1771 leaderActorContext.setCurrentBehavior(leader);
1773 leader.handleMessage(followerActor, appendEntriesReply);
1775 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1776 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1778 appendEntries = appendEntriesList.get(0);
1779 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1780 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1781 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1783 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1784 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1785 appendEntries.getEntries().get(0).getData());
1786 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1787 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1788 appendEntries.getEntries().get(1).getData());
1790 appendEntries = appendEntriesList.get(1);
1791 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1792 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1793 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1795 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1796 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1797 appendEntries.getEntries().get(0).getData());
1798 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1799 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1800 appendEntries.getEntries().get(1).getData());
1802 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1803 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1805 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1807 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1808 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1812 public void testHandleRequestVoteReply(){
1813 logStart("testHandleRequestVoteReply");
1815 MockRaftActorContext leaderActorContext = createActorContext();
1817 leader = new Leader(leaderActorContext);
1819 // Should be a no-op.
1820 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1821 new RequestVoteReply(1, true));
1823 assertEquals(RaftState.Leader, raftActorBehavior.state());
1825 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1827 assertEquals(RaftState.Leader, raftActorBehavior.state());
1831 public void testIsolatedLeaderCheckNoFollowers() {
1832 logStart("testIsolatedLeaderCheckNoFollowers");
1834 MockRaftActorContext leaderActorContext = createActorContext();
1836 leader = new Leader(leaderActorContext);
1837 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1838 assertTrue(behavior instanceof Leader);
1842 public void testIsolatedLeaderCheckNoVotingFollowers() {
1843 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1845 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1846 Follower follower = new Follower(followerActorContext);
1847 followerActor.underlyingActor().setBehavior(follower);
1849 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1850 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1851 new FiniteDuration(1000, TimeUnit.SECONDS));
1852 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1854 leader = new Leader(leaderActorContext);
1855 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1856 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1857 assertTrue("Expected Leader", behavior instanceof Leader);
1860 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1861 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1862 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1864 MockRaftActorContext leaderActorContext = createActorContext();
1866 Map<String, String> peerAddresses = new HashMap<>();
1867 peerAddresses.put("follower-1", followerActor1.path().toString());
1868 peerAddresses.put("follower-2", followerActor2.path().toString());
1870 leaderActorContext.setPeerAddresses(peerAddresses);
1871 leaderActorContext.setRaftPolicy(raftPolicy);
1873 leader = new Leader(leaderActorContext);
1875 leader.markFollowerActive("follower-1");
1876 leader.markFollowerActive("follower-2");
1877 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1878 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1880 // kill 1 follower and verify if that got killed
1881 final JavaTestKit probe = new JavaTestKit(getSystem());
1882 probe.watch(followerActor1);
1883 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1884 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1885 assertEquals(termMsg1.getActor(), followerActor1);
1887 leader.markFollowerInActive("follower-1");
1888 leader.markFollowerActive("follower-2");
1889 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1890 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1892 // kill 2nd follower and leader should change to Isolated leader
1893 followerActor2.tell(PoisonPill.getInstance(), null);
1894 probe.watch(followerActor2);
1895 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1896 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1897 assertEquals(termMsg2.getActor(), followerActor2);
1899 leader.markFollowerInActive("follower-2");
1900 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1904 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1905 logStart("testIsolatedLeaderCheckTwoFollowers");
1907 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1909 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1910 behavior instanceof IsolatedLeader);
1914 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1915 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1917 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1919 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1920 behavior instanceof Leader);
1924 public void testLaggingFollowerStarvation() throws Exception {
1925 logStart("testLaggingFollowerStarvation");
1926 new JavaTestKit(getSystem()) {{
1927 String leaderActorId = actorFactory.generateActorId("leader");
1928 String follower1ActorId = actorFactory.generateActorId("follower");
1929 String follower2ActorId = actorFactory.generateActorId("follower");
1931 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1932 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1933 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1934 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1936 MockRaftActorContext leaderActorContext =
1937 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1939 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1940 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1941 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1943 leaderActorContext.setConfigParams(configParams);
1945 leaderActorContext.setReplicatedLog(
1946 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1948 Map<String, String> peerAddresses = new HashMap<>();
1949 peerAddresses.put(follower1ActorId,
1950 follower1Actor.path().toString());
1951 peerAddresses.put(follower2ActorId,
1952 follower2Actor.path().toString());
1954 leaderActorContext.setPeerAddresses(peerAddresses);
1955 leaderActorContext.getTermInformation().update(1, leaderActorId);
1957 RaftActorBehavior leader = createBehavior(leaderActorContext);
1959 leaderActor.underlyingActor().setBehavior(leader);
1961 for(int i=1;i<6;i++) {
1962 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1963 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1964 assertTrue(newBehavior == leader);
1965 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1968 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1969 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1971 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1972 heartbeats.size() > 1);
1974 // Check if follower-2 got AppendEntries during this time and was not starved
1975 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1977 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1978 appendEntries.size() > 1);
1984 public void testReplicationConsensusWithNonVotingFollower() {
1985 logStart("testReplicationConsensusWithNonVotingFollower");
1987 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1988 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1989 new FiniteDuration(1000, TimeUnit.SECONDS));
1991 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1992 leaderActorContext.setCommitIndex(-1);
1994 String nonVotingFollowerId = "nonvoting-follower";
1995 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1996 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1998 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
2000 leader = new Leader(leaderActorContext);
2001 leaderActorContext.setCurrentBehavior(leader);
2003 // Ignore initial heartbeats
2004 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2005 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2007 MessageCollectorActor.clearMessages(followerActor);
2008 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2009 MessageCollectorActor.clearMessages(leaderActor);
2011 // Send a Replicate message and wait for AppendEntries.
2012 sendReplicate(leaderActorContext, 0);
2014 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2015 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2017 // Send reply only from the voting follower and verify consensus via ApplyState.
2018 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2020 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2022 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2024 MessageCollectorActor.clearMessages(followerActor);
2025 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2026 MessageCollectorActor.clearMessages(leaderActor);
2028 // Send another Replicate message
2029 sendReplicate(leaderActorContext, 1);
2031 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2032 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2033 AppendEntries.class);
2034 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2035 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2037 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2038 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2040 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2042 // Send reply from the voting follower and verify consensus.
2043 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2045 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2049 public void testTransferLeadershipWithFollowerInSync() {
2050 logStart("testTransferLeadershipWithFollowerInSync");
2052 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2053 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2054 new FiniteDuration(1000, TimeUnit.SECONDS));
2055 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2057 leader = new Leader(leaderActorContext);
2058 leaderActorContext.setCurrentBehavior(leader);
2060 // Initial heartbeat
2061 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2062 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2063 MessageCollectorActor.clearMessages(followerActor);
2065 sendReplicate(leaderActorContext, 0);
2066 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2069 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2070 MessageCollectorActor.clearMessages(followerActor);
2072 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2073 leader.transferLeadership(mockTransferCohort);
2075 verify(mockTransferCohort, never()).transferComplete();
2076 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2077 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2079 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2080 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2082 // Leader should force an election timeout
2083 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2085 verify(mockTransferCohort).transferComplete();
2089 public void testTransferLeadershipWithEmptyLog() {
2090 logStart("testTransferLeadershipWithEmptyLog");
2092 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2093 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2094 new FiniteDuration(1000, TimeUnit.SECONDS));
2095 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2097 leader = new Leader(leaderActorContext);
2098 leaderActorContext.setCurrentBehavior(leader);
2100 // Initial heartbeat
2101 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2102 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2103 MessageCollectorActor.clearMessages(followerActor);
2105 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2106 leader.transferLeadership(mockTransferCohort);
2108 verify(mockTransferCohort, never()).transferComplete();
2109 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2112 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2113 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2115 // Leader should force an election timeout
2116 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2118 verify(mockTransferCohort).transferComplete();
2122 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2123 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2125 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2129 leader = new Leader(leaderActorContext);
2130 leaderActorContext.setCurrentBehavior(leader);
2132 // Initial heartbeat
2133 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 MessageCollectorActor.clearMessages(followerActor);
2136 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2137 leader.transferLeadership(mockTransferCohort);
2139 verify(mockTransferCohort, never()).transferComplete();
2141 // Sync up the follower.
2142 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2143 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2144 MessageCollectorActor.clearMessages(followerActor);
2146 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2147 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2148 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2149 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2150 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2152 // Leader should force an election timeout
2153 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2155 verify(mockTransferCohort).transferComplete();
2159 public void testTransferLeadershipWithFollowerSyncTimeout() {
2160 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2162 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2163 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2164 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2165 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2166 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2168 leader = new Leader(leaderActorContext);
2169 leaderActorContext.setCurrentBehavior(leader);
2171 // Initial heartbeat
2172 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2173 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2174 MessageCollectorActor.clearMessages(followerActor);
2176 sendReplicate(leaderActorContext, 0);
2177 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2179 MessageCollectorActor.clearMessages(followerActor);
2181 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2182 leader.transferLeadership(mockTransferCohort);
2184 verify(mockTransferCohort, never()).transferComplete();
2186 // Send heartbeats to time out the transfer.
2187 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2188 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2189 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2190 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2193 verify(mockTransferCohort).abortTransfer();
2194 verify(mockTransferCohort, never()).transferComplete();
2195 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2199 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2200 ActorRef actorRef, RaftRPC rpc) throws Exception {
2201 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2202 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2205 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2207 private final long electionTimeOutIntervalMillis;
2208 private final int snapshotChunkSize;
2210 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2212 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2213 this.snapshotChunkSize = snapshotChunkSize;
2217 public FiniteDuration getElectionTimeOutInterval() {
2218 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2222 public int getSnapshotChunkSize() {
2223 return snapshotChunkSize;