2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
71 static final String FOLLOWER_ID = "follower";
72 public static final String LEADER_ID = "leader";
74 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80 private Leader leader;
81 private final short payloadVersion = 5;
85 public void tearDown() throws Exception {
94 public void testHandleMessageForUnknownMessage() throws Exception {
95 logStart("testHandleMessageForUnknownMessage");
97 leader = new Leader(createActorContext());
99 // handle message should null when it receives an unknown message
100 assertNull(leader.handleMessage(followerActor, "foo"));
104 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107 MockRaftActorContext actorContext = createActorContextWithFollower();
108 actorContext.setCommitIndex(-1);
109 short payloadVersion = (short)5;
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
116 actorContext.setCurrentBehavior(leader);
118 // Leader should send an immediate heartbeat with no entries as follower is inactive.
119 long lastIndex = actorContext.getReplicatedLog().lastIndex();
120 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getTerm", term, appendEntries.getTerm());
122 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 0, appendEntries.getEntries().size());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127 // The follower would normally reply - simulate that explicitly here.
128 leader.handleMessage(followerActor, new AppendEntriesReply(
129 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
132 followerActor.underlyingActor().clear();
134 // Sleep for the heartbeat interval so AppendEntries is sent.
135 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
138 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
140 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143 assertEquals("Entries size", 1, appendEntries.getEntries().size());
144 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151 return sendReplicate(actorContext, 1, index);
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157 term, index, payload);
158 actorContext.getReplicatedLog().append(newEntry);
159 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
163 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
166 MockRaftActorContext actorContext = createActorContextWithFollower();
169 actorContext.getTermInformation().update(term, "");
171 leader = new Leader(actorContext);
173 // Leader will send an immediate heartbeat - ignore it.
174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
176 // The follower would normally reply - simulate that explicitly here.
177 long lastIndex = actorContext.getReplicatedLog().lastIndex();
178 leader.handleMessage(followerActor, new AppendEntriesReply(
179 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
182 followerActor.underlyingActor().clear();
184 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
186 // State should not change
187 assertTrue(raftBehavior instanceof Leader);
189 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192 assertEquals("Entries size", 1, appendEntries.getEntries().size());
193 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
200 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
203 MockRaftActorContext actorContext = createActorContextWithFollower();
204 actorContext.setCommitIndex(-1);
206 // The raft context is initialized with a couple log entries. However the commitIndex
207 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208 // committed and applied. Now it regains leadership with a higher term (2).
209 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210 long newTerm = prevTerm + 1;
211 actorContext.getTermInformation().update(newTerm, "");
213 leader = new Leader(actorContext);
214 actorContext.setCurrentBehavior(leader);
216 // Leader will send an immediate heartbeat - ignore it.
217 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
219 // The follower replies with the leader's current last index and term, simulating that it is
220 // up to date with the leader.
221 long lastIndex = actorContext.getReplicatedLog().lastIndex();
222 leader.handleMessage(followerActor, new AppendEntriesReply(
223 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
225 // The commit index should not get updated even though consensus was reached. This is b/c the
226 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
227 // from previous terms by counting replicas".
228 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
230 followerActor.underlyingActor().clear();
232 // Now replicate a new entry with the new term 2.
233 long newIndex = lastIndex + 1;
234 sendReplicate(actorContext, newTerm, newIndex);
236 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239 assertEquals("Entries size", 1, appendEntries.getEntries().size());
240 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
244 // The follower replies with success. The leader should now update the commit index to the new index
245 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
246 // prior entries are committed indirectly".
247 leader.handleMessage(followerActor, new AppendEntriesReply(
248 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
250 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
254 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
257 MockRaftActorContext actorContext = createActorContextWithFollower();
258 actorContext.setRaftPolicy(createRaftPolicy(true, true));
261 actorContext.getTermInformation().update(term, "");
263 leader = new Leader(actorContext);
265 // Leader will send an immediate heartbeat - ignore it.
266 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
268 // The follower would normally reply - simulate that explicitly here.
269 long lastIndex = actorContext.getReplicatedLog().lastIndex();
270 leader.handleMessage(followerActor, new AppendEntriesReply(
271 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
274 followerActor.underlyingActor().clear();
276 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
278 // State should not change
279 assertTrue(raftBehavior instanceof Leader);
281 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284 assertEquals("Entries size", 1, appendEntries.getEntries().size());
285 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
292 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
295 MockRaftActorContext actorContext = createActorContextWithFollower();
296 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
298 public FiniteDuration getHeartBeatInterval() {
299 return FiniteDuration.apply(5, TimeUnit.SECONDS);
304 actorContext.getTermInformation().update(term, "");
306 leader = new Leader(actorContext);
308 // Leader will send an immediate heartbeat - ignore it.
309 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
311 // The follower would normally reply - simulate that explicitly here.
312 long lastIndex = actorContext.getReplicatedLog().lastIndex();
313 leader.handleMessage(followerActor, new AppendEntriesReply(
314 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
317 followerActor.underlyingActor().clear();
319 for(int i=0;i<5;i++) {
320 sendReplicate(actorContext, lastIndex+i+1);
323 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324 // We expect only 1 message to be sent because of two reasons,
325 // - an append entries reply was not received
326 // - the heartbeat interval has not expired
327 // In this scenario if multiple messages are sent they would likely be duplicates
328 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
332 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
335 MockRaftActorContext actorContext = createActorContextWithFollower();
336 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
338 public FiniteDuration getHeartBeatInterval() {
339 return FiniteDuration.apply(5, TimeUnit.SECONDS);
344 actorContext.getTermInformation().update(term, "");
346 leader = new Leader(actorContext);
348 // Leader will send an immediate heartbeat - ignore it.
349 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
351 // The follower would normally reply - simulate that explicitly here.
352 long lastIndex = actorContext.getReplicatedLog().lastIndex();
353 leader.handleMessage(followerActor, new AppendEntriesReply(
354 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
357 followerActor.underlyingActor().clear();
359 for(int i=0;i<3;i++) {
360 sendReplicate(actorContext, lastIndex+i+1);
361 leader.handleMessage(followerActor, new AppendEntriesReply(
362 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
366 for(int i=3;i<5;i++) {
367 sendReplicate(actorContext, lastIndex + i + 1);
370 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372 // get sent to the follower - but not the 5th
373 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
375 for(int i=0;i<4;i++) {
376 long expected = allMessages.get(i).getEntries().get(0).getIndex();
377 assertEquals(expected, i+2);
382 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
385 MockRaftActorContext actorContext = createActorContextWithFollower();
386 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
388 public FiniteDuration getHeartBeatInterval() {
389 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
394 actorContext.getTermInformation().update(term, "");
396 leader = new Leader(actorContext);
398 // Leader will send an immediate heartbeat - ignore it.
399 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
401 // The follower would normally reply - simulate that explicitly here.
402 long lastIndex = actorContext.getReplicatedLog().lastIndex();
403 leader.handleMessage(followerActor, new AppendEntriesReply(
404 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
407 followerActor.underlyingActor().clear();
409 sendReplicate(actorContext, lastIndex+1);
411 // Wait slightly longer than heartbeat duration
412 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
414 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
416 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
419 assertEquals(1, allMessages.get(0).getEntries().size());
420 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
421 assertEquals(1, allMessages.get(1).getEntries().size());
422 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
427 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
430 MockRaftActorContext actorContext = createActorContextWithFollower();
431 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
433 public FiniteDuration getHeartBeatInterval() {
434 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
439 actorContext.getTermInformation().update(term, "");
441 leader = new Leader(actorContext);
443 // Leader will send an immediate heartbeat - ignore it.
444 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446 // The follower would normally reply - simulate that explicitly here.
447 long lastIndex = actorContext.getReplicatedLog().lastIndex();
448 leader.handleMessage(followerActor, new AppendEntriesReply(
449 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
452 followerActor.underlyingActor().clear();
454 for(int i=0;i<3;i++) {
455 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
459 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
464 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
467 MockRaftActorContext actorContext = createActorContextWithFollower();
468 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470 public FiniteDuration getHeartBeatInterval() {
471 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
476 actorContext.getTermInformation().update(term, "");
478 leader = new Leader(actorContext);
480 // Leader will send an immediate heartbeat - ignore it.
481 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483 // The follower would normally reply - simulate that explicitly here.
484 long lastIndex = actorContext.getReplicatedLog().lastIndex();
485 leader.handleMessage(followerActor, new AppendEntriesReply(
486 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489 followerActor.underlyingActor().clear();
491 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493 sendReplicate(actorContext, lastIndex+1);
495 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
498 assertEquals(0, allMessages.get(0).getEntries().size());
499 assertEquals(1, allMessages.get(1).getEntries().size());
504 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
507 MockRaftActorContext actorContext = createActorContext();
509 leader = new Leader(actorContext);
511 actorContext.setLastApplied(0);
513 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514 long term = actorContext.getTermInformation().getCurrentTerm();
515 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
516 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
518 actorContext.getReplicatedLog().append(newEntry);
520 final Identifier id = new MockIdentifier("state-id");
521 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
523 // State should not change
524 assertTrue(raftBehavior instanceof Leader);
526 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
528 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
529 // one since lastApplied state is 0.
530 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
531 leaderActor, ApplyState.class);
532 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
534 for(int i = 0; i <= newLogIndex - 1; i++ ) {
535 ApplyState applyState = applyStateList.get(i);
536 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
537 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
540 ApplyState last = applyStateList.get((int) newLogIndex - 1);
541 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
542 assertEquals("getIdentifier", id, last.getIdentifier());
546 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
547 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
549 MockRaftActorContext actorContext = createActorContextWithFollower();
551 Map<String, String> leadersSnapshot = new HashMap<>();
552 leadersSnapshot.put("1", "A");
553 leadersSnapshot.put("2", "B");
554 leadersSnapshot.put("3", "C");
557 actorContext.getReplicatedLog().removeFrom(0);
559 final int commitIndex = 3;
560 final int snapshotIndex = 2;
561 final int newEntryIndex = 4;
562 final int snapshotTerm = 1;
563 final int currentTerm = 2;
565 // set the snapshot variables in replicatedlog
566 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
567 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
568 actorContext.setCommitIndex(commitIndex);
569 //set follower timeout to 2 mins, helps during debugging
570 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
572 leader = new Leader(actorContext);
574 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
575 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
578 ReplicatedLogImplEntry entry =
579 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580 new MockRaftActorContext.MockPayload("D"));
582 //update follower timestamp
583 leader.markFollowerActive(FOLLOWER_ID);
585 ByteString bs = toByteString(leadersSnapshot);
586 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
587 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
588 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
589 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
591 //send first chunk and no InstallSnapshotReply received yet
593 fts.incrementChunkIndex();
595 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596 TimeUnit.MILLISECONDS);
598 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
600 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
602 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604 //InstallSnapshotReply received
605 fts.markSendStatus(true);
607 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611 assertEquals(commitIndex, is.getLastIncludedIndex());
615 public void testSendAppendEntriesSnapshotScenario() throws Exception {
616 logStart("testSendAppendEntriesSnapshotScenario");
618 MockRaftActorContext actorContext = createActorContextWithFollower();
620 Map<String, String> leadersSnapshot = new HashMap<>();
621 leadersSnapshot.put("1", "A");
622 leadersSnapshot.put("2", "B");
623 leadersSnapshot.put("3", "C");
626 actorContext.getReplicatedLog().removeFrom(0);
628 final int followersLastIndex = 2;
629 final int snapshotIndex = 3;
630 final int newEntryIndex = 4;
631 final int snapshotTerm = 1;
632 final int currentTerm = 2;
634 // set the snapshot variables in replicatedlog
635 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637 actorContext.setCommitIndex(followersLastIndex);
639 leader = new Leader(actorContext);
641 // Leader will send an immediate heartbeat - ignore it.
642 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
645 ReplicatedLogImplEntry entry =
646 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
647 new MockRaftActorContext.MockPayload("D"));
649 actorContext.getReplicatedLog().append(entry);
651 //update follower timestamp
652 leader.markFollowerActive(FOLLOWER_ID);
654 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655 RaftActorBehavior raftBehavior = leader.handleMessage(
656 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
658 assertTrue(raftBehavior instanceof Leader);
660 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
664 public void testInitiateInstallSnapshot() throws Exception {
665 logStart("testInitiateInstallSnapshot");
667 MockRaftActorContext actorContext = createActorContextWithFollower();
670 actorContext.getReplicatedLog().removeFrom(0);
672 final int followersLastIndex = 2;
673 final int snapshotIndex = 3;
674 final int newEntryIndex = 4;
675 final int snapshotTerm = 1;
676 final int currentTerm = 2;
678 // set the snapshot variables in replicatedlog
679 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681 actorContext.setLastApplied(3);
682 actorContext.setCommitIndex(followersLastIndex);
684 leader = new Leader(actorContext);
686 // Leader will send an immediate heartbeat - ignore it.
687 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689 // set the snapshot as absent and check if capture-snapshot is invoked.
690 leader.setSnapshot(null);
693 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
694 new MockRaftActorContext.MockPayload("D"));
696 actorContext.getReplicatedLog().append(entry);
698 //update follower timestamp
699 leader.markFollowerActive(FOLLOWER_ID);
701 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
703 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707 assertTrue(cs.isInstallSnapshotInitiated());
708 assertEquals(3, cs.getLastAppliedIndex());
709 assertEquals(1, cs.getLastAppliedTerm());
710 assertEquals(4, cs.getLastIndex());
711 assertEquals(2, cs.getLastTerm());
713 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
716 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
720 public void testInitiateForceInstallSnapshot() throws Exception {
721 logStart("testInitiateForceInstallSnapshot");
723 MockRaftActorContext actorContext = createActorContextWithFollower();
725 final int followersLastIndex = 2;
726 final int snapshotIndex = -1;
727 final int newEntryIndex = 4;
728 final int snapshotTerm = -1;
729 final int currentTerm = 2;
731 // set the snapshot variables in replicatedlog
732 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734 actorContext.setLastApplied(3);
735 actorContext.setCommitIndex(followersLastIndex);
737 actorContext.getReplicatedLog().removeFrom(0);
739 leader = new Leader(actorContext);
741 // Leader will send an immediate heartbeat - ignore it.
742 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744 // set the snapshot as absent and check if capture-snapshot is invoked.
745 leader.setSnapshot(null);
747 for(int i=0;i<4;i++) {
748 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
749 new MockRaftActorContext.MockPayload("X" + i)));
753 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
754 new MockRaftActorContext.MockPayload("D"));
756 actorContext.getReplicatedLog().append(entry);
758 //update follower timestamp
759 leader.markFollowerActive(FOLLOWER_ID);
761 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762 // installed with a SendInstallSnapshot
763 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
765 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769 assertTrue(cs.isInstallSnapshotInitiated());
770 assertEquals(3, cs.getLastAppliedIndex());
771 assertEquals(1, cs.getLastAppliedTerm());
772 assertEquals(4, cs.getLastIndex());
773 assertEquals(2, cs.getLastTerm());
775 // if an initiate is started again when first is in progress, it should not initiate Capture
776 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
778 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
783 public void testInstallSnapshot() throws Exception {
784 logStart("testInstallSnapshot");
786 MockRaftActorContext actorContext = createActorContextWithFollower();
788 Map<String, String> leadersSnapshot = new HashMap<>();
789 leadersSnapshot.put("1", "A");
790 leadersSnapshot.put("2", "B");
791 leadersSnapshot.put("3", "C");
794 actorContext.getReplicatedLog().removeFrom(0);
796 final int lastAppliedIndex = 3;
797 final int snapshotIndex = 2;
798 final int snapshotTerm = 1;
799 final int currentTerm = 2;
801 // set the snapshot variables in replicatedlog
802 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805 actorContext.setCommitIndex(lastAppliedIndex);
806 actorContext.setLastApplied(lastAppliedIndex);
808 leader = new Leader(actorContext);
810 // Initial heartbeat.
811 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
813 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
816 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
817 Collections.<ReplicatedLogEntry>emptyList(),
818 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
820 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
822 assertTrue(raftBehavior instanceof Leader);
824 // check if installsnapshot gets called with the correct values.
826 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828 assertNotNull(installSnapshot.getData());
829 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
830 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
832 assertEquals(currentTerm, installSnapshot.getTerm());
836 public void testForceInstallSnapshot() throws Exception {
837 logStart("testForceInstallSnapshot");
839 MockRaftActorContext actorContext = createActorContextWithFollower();
841 Map<String, String> leadersSnapshot = new HashMap<>();
842 leadersSnapshot.put("1", "A");
843 leadersSnapshot.put("2", "B");
844 leadersSnapshot.put("3", "C");
846 final int lastAppliedIndex = 3;
847 final int snapshotIndex = -1;
848 final int snapshotTerm = -1;
849 final int currentTerm = 2;
851 // set the snapshot variables in replicatedlog
852 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
853 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
854 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
855 actorContext.setCommitIndex(lastAppliedIndex);
856 actorContext.setLastApplied(lastAppliedIndex);
858 leader = new Leader(actorContext);
860 // Initial heartbeat.
861 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
864 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
866 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
867 Collections.<ReplicatedLogEntry>emptyList(),
868 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
870 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
872 assertTrue(raftBehavior instanceof Leader);
874 // check if installsnapshot gets called with the correct values.
876 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
878 assertNotNull(installSnapshot.getData());
879 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
880 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
882 assertEquals(currentTerm, installSnapshot.getTerm());
886 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
887 logStart("testHandleInstallSnapshotReplyLastChunk");
889 MockRaftActorContext actorContext = createActorContextWithFollower();
891 final int commitIndex = 3;
892 final int snapshotIndex = 2;
893 final int snapshotTerm = 1;
894 final int currentTerm = 2;
896 actorContext.setCommitIndex(commitIndex);
898 leader = new Leader(actorContext);
899 actorContext.setCurrentBehavior(leader);
901 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
904 // Ignore initial heartbeat.
905 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
907 Map<String, String> leadersSnapshot = new HashMap<>();
908 leadersSnapshot.put("1", "A");
909 leadersSnapshot.put("2", "B");
910 leadersSnapshot.put("3", "C");
912 // set the snapshot variables in replicatedlog
914 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
918 ByteString bs = toByteString(leadersSnapshot);
919 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
920 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
921 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
922 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
923 while(!fts.isLastChunk(fts.getChunkIndex())) {
925 fts.incrementChunkIndex();
929 actorContext.getReplicatedLog().removeFrom(0);
931 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
932 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
934 assertTrue(raftBehavior instanceof Leader);
936 assertEquals(0, leader.followerSnapshotSize());
937 assertEquals(1, leader.followerLogSize());
938 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
940 assertEquals(commitIndex, fli.getMatchIndex());
941 assertEquals(commitIndex + 1, fli.getNextIndex());
945 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
946 logStart("testSendSnapshotfromInstallSnapshotReply");
948 MockRaftActorContext actorContext = createActorContextWithFollower();
950 final int commitIndex = 3;
951 final int snapshotIndex = 2;
952 final int snapshotTerm = 1;
953 final int currentTerm = 2;
955 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
957 public int getSnapshotChunkSize() {
961 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
962 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
964 actorContext.setConfigParams(configParams);
965 actorContext.setCommitIndex(commitIndex);
967 leader = new Leader(actorContext);
968 actorContext.setCurrentBehavior(leader);
970 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
971 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
973 Map<String, String> leadersSnapshot = new HashMap<>();
974 leadersSnapshot.put("1", "A");
975 leadersSnapshot.put("2", "B");
976 leadersSnapshot.put("3", "C");
978 // set the snapshot variables in replicatedlog
979 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
980 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
981 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
983 ByteString bs = toByteString(leadersSnapshot);
984 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
985 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
986 leader.setSnapshot(snapshot);
988 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
990 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
992 assertEquals(1, installSnapshot.getChunkIndex());
993 assertEquals(3, installSnapshot.getTotalChunks());
995 followerActor.underlyingActor().clear();
996 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
997 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
999 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1001 assertEquals(2, installSnapshot.getChunkIndex());
1002 assertEquals(3, installSnapshot.getTotalChunks());
1004 followerActor.underlyingActor().clear();
1005 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1008 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1010 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1011 followerActor.underlyingActor().clear();
1012 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1013 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1015 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1017 assertNull(installSnapshot);
1022 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1023 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1025 MockRaftActorContext actorContext = createActorContextWithFollower();
1027 final int commitIndex = 3;
1028 final int snapshotIndex = 2;
1029 final int snapshotTerm = 1;
1030 final int currentTerm = 2;
1032 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1034 public int getSnapshotChunkSize() {
1039 actorContext.setCommitIndex(commitIndex);
1041 leader = new Leader(actorContext);
1043 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1044 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1046 Map<String, String> leadersSnapshot = new HashMap<>();
1047 leadersSnapshot.put("1", "A");
1048 leadersSnapshot.put("2", "B");
1049 leadersSnapshot.put("3", "C");
1051 // set the snapshot variables in replicatedlog
1052 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1053 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1054 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1056 ByteString bs = toByteString(leadersSnapshot);
1057 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1058 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1059 leader.setSnapshot(snapshot);
1061 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1062 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1064 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1066 assertEquals(1, installSnapshot.getChunkIndex());
1067 assertEquals(3, installSnapshot.getTotalChunks());
1069 followerActor.underlyingActor().clear();
1071 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1072 FOLLOWER_ID, -1, false));
1074 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1075 TimeUnit.MILLISECONDS);
1077 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1079 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1081 assertEquals(1, installSnapshot.getChunkIndex());
1082 assertEquals(3, installSnapshot.getTotalChunks());
1086 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1087 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1089 MockRaftActorContext actorContext = createActorContextWithFollower();
1091 final int commitIndex = 3;
1092 final int snapshotIndex = 2;
1093 final int snapshotTerm = 1;
1094 final int currentTerm = 2;
1096 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1098 public int getSnapshotChunkSize() {
1103 actorContext.setCommitIndex(commitIndex);
1105 leader = new Leader(actorContext);
1107 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1108 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1110 Map<String, String> leadersSnapshot = new HashMap<>();
1111 leadersSnapshot.put("1", "A");
1112 leadersSnapshot.put("2", "B");
1113 leadersSnapshot.put("3", "C");
1115 // set the snapshot variables in replicatedlog
1116 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1117 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1118 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1120 ByteString bs = toByteString(leadersSnapshot);
1121 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1122 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1123 leader.setSnapshot(snapshot);
1125 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1127 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1129 assertEquals(1, installSnapshot.getChunkIndex());
1130 assertEquals(3, installSnapshot.getTotalChunks());
1131 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1133 int hashCode = Arrays.hashCode(installSnapshot.getData());
1135 followerActor.underlyingActor().clear();
1137 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1138 FOLLOWER_ID, 1, true));
1140 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1142 assertEquals(2, installSnapshot.getChunkIndex());
1143 assertEquals(3, installSnapshot.getTotalChunks());
1144 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1148 public void testFollowerToSnapshotLogic() {
1149 logStart("testFollowerToSnapshotLogic");
1151 MockRaftActorContext actorContext = createActorContext();
1153 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1155 public int getSnapshotChunkSize() {
1160 leader = new Leader(actorContext);
1162 Map<String, String> leadersSnapshot = new HashMap<>();
1163 leadersSnapshot.put("1", "A");
1164 leadersSnapshot.put("2", "B");
1165 leadersSnapshot.put("3", "C");
1167 ByteString bs = toByteString(leadersSnapshot);
1168 byte[] barray = bs.toByteArray();
1170 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1171 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1173 assertEquals(bs.size(), barray.length);
1176 for (int i=0; i < barray.length; i = i + 50) {
1180 if (i + 50 > barray.length) {
1184 byte[] chunk = fts.getNextChunk();
1185 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1186 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1188 fts.markSendStatus(true);
1189 if (!fts.isLastChunk(chunkIndex)) {
1190 fts.incrementChunkIndex();
1194 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1198 protected Leader createBehavior(final RaftActorContext actorContext) {
1199 return new Leader(actorContext);
1203 protected MockRaftActorContext createActorContext() {
1204 return createActorContext(leaderActor);
1208 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1209 return createActorContext(LEADER_ID, actorRef);
1212 private MockRaftActorContext createActorContextWithFollower() {
1213 MockRaftActorContext actorContext = createActorContext();
1214 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1215 followerActor.path().toString()).build());
1216 return actorContext;
1219 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1220 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1221 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1222 configParams.setElectionTimeoutFactor(100000);
1223 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1224 context.setConfigParams(configParams);
1225 context.setPayloadVersion(payloadVersion);
1229 private MockRaftActorContext createFollowerActorContextWithLeader() {
1230 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1232 followerConfig.setElectionTimeoutFactor(10000);
1233 followerActorContext.setConfigParams(followerConfig);
1234 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1235 return followerActorContext;
1239 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1240 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1242 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1244 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1246 Follower follower = new Follower(followerActorContext);
1247 followerActor.underlyingActor().setBehavior(follower);
1248 followerActorContext.setCurrentBehavior(follower);
1250 Map<String, String> peerAddresses = new HashMap<>();
1251 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1253 leaderActorContext.setPeerAddresses(peerAddresses);
1255 leaderActorContext.getReplicatedLog().removeFrom(0);
1258 leaderActorContext.setReplicatedLog(
1259 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1261 leaderActorContext.setCommitIndex(1);
1263 followerActorContext.getReplicatedLog().removeFrom(0);
1265 // follower too has the exact same log entries and has the same commit index
1266 followerActorContext.setReplicatedLog(
1267 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1269 followerActorContext.setCommitIndex(1);
1271 leader = new Leader(leaderActorContext);
1272 leaderActorContext.setCurrentBehavior(leader);
1274 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1276 assertEquals(1, appendEntries.getLeaderCommit());
1277 assertEquals(0, appendEntries.getEntries().size());
1278 assertEquals(0, appendEntries.getPrevLogIndex());
1280 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1281 leaderActor, AppendEntriesReply.class);
1283 assertEquals(2, appendEntriesReply.getLogLastIndex());
1284 assertEquals(1, appendEntriesReply.getLogLastTerm());
1286 // follower returns its next index
1287 assertEquals(2, appendEntriesReply.getLogLastIndex());
1288 assertEquals(1, appendEntriesReply.getLogLastTerm());
1294 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1295 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1297 MockRaftActorContext leaderActorContext = createActorContext();
1299 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1300 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1302 Follower follower = new Follower(followerActorContext);
1303 followerActor.underlyingActor().setBehavior(follower);
1304 followerActorContext.setCurrentBehavior(follower);
1306 Map<String, String> leaderPeerAddresses = new HashMap<>();
1307 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1309 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1311 leaderActorContext.getReplicatedLog().removeFrom(0);
1313 leaderActorContext.setReplicatedLog(
1314 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316 leaderActorContext.setCommitIndex(1);
1318 followerActorContext.getReplicatedLog().removeFrom(0);
1320 followerActorContext.setReplicatedLog(
1321 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1323 // follower has the same log entries but its commit index > leaders commit index
1324 followerActorContext.setCommitIndex(2);
1326 leader = new Leader(leaderActorContext);
1328 // Initial heartbeat
1329 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1331 assertEquals(1, appendEntries.getLeaderCommit());
1332 assertEquals(0, appendEntries.getEntries().size());
1333 assertEquals(0, appendEntries.getPrevLogIndex());
1335 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1336 leaderActor, AppendEntriesReply.class);
1338 assertEquals(2, appendEntriesReply.getLogLastIndex());
1339 assertEquals(1, appendEntriesReply.getLogLastTerm());
1341 leaderActor.underlyingActor().setBehavior(follower);
1342 leader.handleMessage(followerActor, appendEntriesReply);
1344 leaderActor.underlyingActor().clear();
1345 followerActor.underlyingActor().clear();
1347 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1348 TimeUnit.MILLISECONDS);
1350 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1352 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1354 assertEquals(2, appendEntries.getLeaderCommit());
1355 assertEquals(0, appendEntries.getEntries().size());
1356 assertEquals(2, appendEntries.getPrevLogIndex());
1358 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1360 assertEquals(2, appendEntriesReply.getLogLastIndex());
1361 assertEquals(1, appendEntriesReply.getLogLastTerm());
1363 assertEquals(2, followerActorContext.getCommitIndex());
1369 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1370 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1372 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1373 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1374 new FiniteDuration(1000, TimeUnit.SECONDS));
1376 leaderActorContext.setReplicatedLog(
1377 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1378 long leaderCommitIndex = 2;
1379 leaderActorContext.setCommitIndex(leaderCommitIndex);
1380 leaderActorContext.setLastApplied(leaderCommitIndex);
1382 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1383 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1385 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1387 followerActorContext.setReplicatedLog(
1388 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1389 followerActorContext.setCommitIndex(0);
1390 followerActorContext.setLastApplied(0);
1392 Follower follower = new Follower(followerActorContext);
1393 followerActor.underlyingActor().setBehavior(follower);
1395 leader = new Leader(leaderActorContext);
1397 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1398 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1400 MessageCollectorActor.clearMessages(followerActor);
1401 MessageCollectorActor.clearMessages(leaderActor);
1403 // Verify initial AppendEntries sent with the leader's current commit index.
1404 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1405 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1406 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1408 leaderActor.underlyingActor().setBehavior(leader);
1410 leader.handleMessage(followerActor, appendEntriesReply);
1412 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1413 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1416 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1417 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1419 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1420 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1421 appendEntries.getEntries().get(0).getData());
1422 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1423 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1424 appendEntries.getEntries().get(1).getData());
1426 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1427 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1429 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1431 ApplyState applyState = applyStateList.get(0);
1432 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1433 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1434 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1435 applyState.getReplicatedLogEntry().getData());
1437 applyState = applyStateList.get(1);
1438 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1439 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1440 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1441 applyState.getReplicatedLogEntry().getData());
1443 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1444 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1448 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1449 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1451 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1452 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1453 new FiniteDuration(1000, TimeUnit.SECONDS));
1455 leaderActorContext.setReplicatedLog(
1456 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1457 long leaderCommitIndex = 1;
1458 leaderActorContext.setCommitIndex(leaderCommitIndex);
1459 leaderActorContext.setLastApplied(leaderCommitIndex);
1461 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1462 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1464 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1466 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1467 followerActorContext.setCommitIndex(-1);
1468 followerActorContext.setLastApplied(-1);
1470 Follower follower = new Follower(followerActorContext);
1471 followerActor.underlyingActor().setBehavior(follower);
1472 followerActorContext.setCurrentBehavior(follower);
1474 leader = new Leader(leaderActorContext);
1476 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1477 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1479 MessageCollectorActor.clearMessages(followerActor);
1480 MessageCollectorActor.clearMessages(leaderActor);
1482 // Verify initial AppendEntries sent with the leader's current commit index.
1483 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1484 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1485 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1487 leaderActor.underlyingActor().setBehavior(leader);
1488 leaderActorContext.setCurrentBehavior(leader);
1490 leader.handleMessage(followerActor, appendEntriesReply);
1492 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1493 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1495 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1496 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1497 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1499 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1500 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1501 appendEntries.getEntries().get(0).getData());
1502 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1503 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1504 appendEntries.getEntries().get(1).getData());
1506 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1507 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1509 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1511 ApplyState applyState = applyStateList.get(0);
1512 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1513 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1514 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1515 applyState.getReplicatedLogEntry().getData());
1517 applyState = applyStateList.get(1);
1518 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1519 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1520 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1521 applyState.getReplicatedLogEntry().getData());
1523 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1524 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1528 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1529 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1531 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1532 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1533 new FiniteDuration(1000, TimeUnit.SECONDS));
1535 leaderActorContext.setReplicatedLog(
1536 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1537 long leaderCommitIndex = 1;
1538 leaderActorContext.setCommitIndex(leaderCommitIndex);
1539 leaderActorContext.setLastApplied(leaderCommitIndex);
1541 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1542 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1544 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1546 followerActorContext.setReplicatedLog(
1547 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1548 followerActorContext.setCommitIndex(-1);
1549 followerActorContext.setLastApplied(-1);
1551 Follower follower = new Follower(followerActorContext);
1552 followerActor.underlyingActor().setBehavior(follower);
1553 followerActorContext.setCurrentBehavior(follower);
1555 leader = new Leader(leaderActorContext);
1557 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1558 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1560 MessageCollectorActor.clearMessages(followerActor);
1561 MessageCollectorActor.clearMessages(leaderActor);
1563 // Verify initial AppendEntries sent with the leader's current commit index.
1564 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1565 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1566 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1568 leaderActor.underlyingActor().setBehavior(leader);
1569 leaderActorContext.setCurrentBehavior(leader);
1571 leader.handleMessage(followerActor, appendEntriesReply);
1573 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1574 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1576 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1577 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1578 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1580 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1581 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1582 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1583 appendEntries.getEntries().get(0).getData());
1584 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1585 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1586 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1587 appendEntries.getEntries().get(1).getData());
1589 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1590 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1592 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1594 ApplyState applyState = applyStateList.get(0);
1595 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1596 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1597 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1598 applyState.getReplicatedLogEntry().getData());
1600 applyState = applyStateList.get(1);
1601 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1602 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1603 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1604 applyState.getReplicatedLogEntry().getData());
1606 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1607 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1608 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1612 public void testHandleAppendEntriesReplyWithNewerTerm(){
1613 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1615 MockRaftActorContext leaderActorContext = createActorContext();
1616 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1617 new FiniteDuration(10000, TimeUnit.SECONDS));
1619 leaderActorContext.setReplicatedLog(
1620 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1622 leader = new Leader(leaderActorContext);
1623 leaderActor.underlyingActor().setBehavior(leader);
1624 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1626 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1628 assertEquals(false, appendEntriesReply.isSuccess());
1629 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1631 MessageCollectorActor.clearMessages(leaderActor);
1635 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1636 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1638 MockRaftActorContext leaderActorContext = createActorContext();
1639 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1640 new FiniteDuration(10000, TimeUnit.SECONDS));
1642 leaderActorContext.setReplicatedLog(
1643 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1644 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1646 leader = new Leader(leaderActorContext);
1647 leaderActor.underlyingActor().setBehavior(leader);
1648 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1650 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1652 assertEquals(false, appendEntriesReply.isSuccess());
1653 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1655 MessageCollectorActor.clearMessages(leaderActor);
1659 public void testHandleAppendEntriesReplySuccess() throws Exception {
1660 logStart("testHandleAppendEntriesReplySuccess");
1662 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1664 leaderActorContext.setReplicatedLog(
1665 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1667 leaderActorContext.setCommitIndex(1);
1668 leaderActorContext.setLastApplied(1);
1669 leaderActorContext.getTermInformation().update(1, "leader");
1671 leader = new Leader(leaderActorContext);
1673 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1675 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1676 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1678 short payloadVersion = 5;
1679 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1681 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1683 assertEquals(RaftState.Leader, raftActorBehavior.state());
1685 assertEquals(2, leaderActorContext.getCommitIndex());
1687 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1688 leaderActor, ApplyJournalEntries.class);
1690 assertEquals(2, leaderActorContext.getLastApplied());
1692 assertEquals(2, applyJournalEntries.getToIndex());
1694 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1697 assertEquals(1,applyStateList.size());
1699 ApplyState applyState = applyStateList.get(0);
1701 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1703 assertEquals(2, followerInfo.getMatchIndex());
1704 assertEquals(3, followerInfo.getNextIndex());
1705 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1706 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1710 public void testHandleAppendEntriesReplyUnknownFollower(){
1711 logStart("testHandleAppendEntriesReplyUnknownFollower");
1713 MockRaftActorContext leaderActorContext = createActorContext();
1715 leader = new Leader(leaderActorContext);
1717 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1719 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1721 assertEquals(RaftState.Leader, raftActorBehavior.state());
1725 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1726 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1728 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1729 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1730 new FiniteDuration(1000, TimeUnit.SECONDS));
1731 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1733 leaderActorContext.setReplicatedLog(
1734 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1735 long leaderCommitIndex = 3;
1736 leaderActorContext.setCommitIndex(leaderCommitIndex);
1737 leaderActorContext.setLastApplied(leaderCommitIndex);
1739 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1740 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1741 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1742 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1744 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1746 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1747 followerActorContext.setCommitIndex(-1);
1748 followerActorContext.setLastApplied(-1);
1750 Follower follower = new Follower(followerActorContext);
1751 followerActor.underlyingActor().setBehavior(follower);
1752 followerActorContext.setCurrentBehavior(follower);
1754 leader = new Leader(leaderActorContext);
1756 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1757 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1759 MessageCollectorActor.clearMessages(followerActor);
1760 MessageCollectorActor.clearMessages(leaderActor);
1762 // Verify initial AppendEntries sent with the leader's current commit index.
1763 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1764 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1765 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1767 leaderActor.underlyingActor().setBehavior(leader);
1768 leaderActorContext.setCurrentBehavior(leader);
1770 leader.handleMessage(followerActor, appendEntriesReply);
1772 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1773 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1775 appendEntries = appendEntriesList.get(0);
1776 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1777 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1778 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1780 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1781 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1782 appendEntries.getEntries().get(0).getData());
1783 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1784 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1785 appendEntries.getEntries().get(1).getData());
1787 appendEntries = appendEntriesList.get(1);
1788 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1789 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1790 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1792 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1793 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1794 appendEntries.getEntries().get(0).getData());
1795 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1796 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1797 appendEntries.getEntries().get(1).getData());
1799 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1800 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1802 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1804 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1805 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1809 public void testHandleRequestVoteReply(){
1810 logStart("testHandleRequestVoteReply");
1812 MockRaftActorContext leaderActorContext = createActorContext();
1814 leader = new Leader(leaderActorContext);
1816 // Should be a no-op.
1817 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1818 new RequestVoteReply(1, true));
1820 assertEquals(RaftState.Leader, raftActorBehavior.state());
1822 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1824 assertEquals(RaftState.Leader, raftActorBehavior.state());
1828 public void testIsolatedLeaderCheckNoFollowers() {
1829 logStart("testIsolatedLeaderCheckNoFollowers");
1831 MockRaftActorContext leaderActorContext = createActorContext();
1833 leader = new Leader(leaderActorContext);
1834 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1835 assertTrue(behavior instanceof Leader);
1839 public void testIsolatedLeaderCheckNoVotingFollowers() {
1840 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1842 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1843 Follower follower = new Follower(followerActorContext);
1844 followerActor.underlyingActor().setBehavior(follower);
1846 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1847 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1848 new FiniteDuration(1000, TimeUnit.SECONDS));
1849 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1851 leader = new Leader(leaderActorContext);
1852 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1853 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1854 assertTrue("Expected Leader", behavior instanceof Leader);
1857 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1858 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1859 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1861 MockRaftActorContext leaderActorContext = createActorContext();
1863 Map<String, String> peerAddresses = new HashMap<>();
1864 peerAddresses.put("follower-1", followerActor1.path().toString());
1865 peerAddresses.put("follower-2", followerActor2.path().toString());
1867 leaderActorContext.setPeerAddresses(peerAddresses);
1868 leaderActorContext.setRaftPolicy(raftPolicy);
1870 leader = new Leader(leaderActorContext);
1872 leader.markFollowerActive("follower-1");
1873 leader.markFollowerActive("follower-2");
1874 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1875 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1877 // kill 1 follower and verify if that got killed
1878 final JavaTestKit probe = new JavaTestKit(getSystem());
1879 probe.watch(followerActor1);
1880 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1881 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1882 assertEquals(termMsg1.getActor(), followerActor1);
1884 leader.markFollowerInActive("follower-1");
1885 leader.markFollowerActive("follower-2");
1886 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1887 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1889 // kill 2nd follower and leader should change to Isolated leader
1890 followerActor2.tell(PoisonPill.getInstance(), null);
1891 probe.watch(followerActor2);
1892 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1893 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1894 assertEquals(termMsg2.getActor(), followerActor2);
1896 leader.markFollowerInActive("follower-2");
1897 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1901 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1902 logStart("testIsolatedLeaderCheckTwoFollowers");
1904 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1906 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1907 behavior instanceof IsolatedLeader);
1911 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1912 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1914 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1916 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1917 behavior instanceof Leader);
1921 public void testLaggingFollowerStarvation() throws Exception {
1922 logStart("testLaggingFollowerStarvation");
1923 new JavaTestKit(getSystem()) {{
1924 String leaderActorId = actorFactory.generateActorId("leader");
1925 String follower1ActorId = actorFactory.generateActorId("follower");
1926 String follower2ActorId = actorFactory.generateActorId("follower");
1928 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1929 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1930 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1931 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1933 MockRaftActorContext leaderActorContext =
1934 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1936 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1937 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1938 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1940 leaderActorContext.setConfigParams(configParams);
1942 leaderActorContext.setReplicatedLog(
1943 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1945 Map<String, String> peerAddresses = new HashMap<>();
1946 peerAddresses.put(follower1ActorId,
1947 follower1Actor.path().toString());
1948 peerAddresses.put(follower2ActorId,
1949 follower2Actor.path().toString());
1951 leaderActorContext.setPeerAddresses(peerAddresses);
1952 leaderActorContext.getTermInformation().update(1, leaderActorId);
1954 RaftActorBehavior leader = createBehavior(leaderActorContext);
1956 leaderActor.underlyingActor().setBehavior(leader);
1958 for(int i=1;i<6;i++) {
1959 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1960 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1961 assertTrue(newBehavior == leader);
1962 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1965 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1966 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1968 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1969 heartbeats.size() > 1);
1971 // Check if follower-2 got AppendEntries during this time and was not starved
1972 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1974 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1975 appendEntries.size() > 1);
1981 public void testReplicationConsensusWithNonVotingFollower() {
1982 logStart("testReplicationConsensusWithNonVotingFollower");
1984 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1985 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1986 new FiniteDuration(1000, TimeUnit.SECONDS));
1988 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1989 leaderActorContext.setCommitIndex(-1);
1991 String nonVotingFollowerId = "nonvoting-follower";
1992 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1995 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1997 leader = new Leader(leaderActorContext);
1998 leaderActorContext.setCurrentBehavior(leader);
2000 // Ignore initial heartbeats
2001 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2002 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2004 MessageCollectorActor.clearMessages(followerActor);
2005 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2006 MessageCollectorActor.clearMessages(leaderActor);
2008 // Send a Replicate message and wait for AppendEntries.
2009 sendReplicate(leaderActorContext, 0);
2011 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2014 // Send reply only from the voting follower and verify consensus via ApplyState.
2015 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2017 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2019 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2021 MessageCollectorActor.clearMessages(followerActor);
2022 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2023 MessageCollectorActor.clearMessages(leaderActor);
2025 // Send another Replicate message
2026 sendReplicate(leaderActorContext, 1);
2028 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2029 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2030 AppendEntries.class);
2031 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2032 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2034 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2035 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2037 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2039 // Send reply from the voting follower and verify consensus.
2040 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2042 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2046 public void testTransferLeadershipWithFollowerInSync() {
2047 logStart("testTransferLeadershipWithFollowerInSync");
2049 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051 new FiniteDuration(1000, TimeUnit.SECONDS));
2052 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054 leader = new Leader(leaderActorContext);
2055 leaderActorContext.setCurrentBehavior(leader);
2057 // Initial heartbeat
2058 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060 MessageCollectorActor.clearMessages(followerActor);
2062 sendReplicate(leaderActorContext, 0);
2063 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2065 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2066 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2067 MessageCollectorActor.clearMessages(followerActor);
2069 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070 leader.transferLeadership(mockTransferCohort);
2072 verify(mockTransferCohort, never()).transferComplete();
2073 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2076 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2079 // Leader should force an election timeout
2080 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2082 verify(mockTransferCohort).transferComplete();
2086 public void testTransferLeadershipWithEmptyLog() {
2087 logStart("testTransferLeadershipWithEmptyLog");
2089 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091 new FiniteDuration(1000, TimeUnit.SECONDS));
2092 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2094 leader = new Leader(leaderActorContext);
2095 leaderActorContext.setCurrentBehavior(leader);
2097 // Initial heartbeat
2098 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100 MessageCollectorActor.clearMessages(followerActor);
2102 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2103 leader.transferLeadership(mockTransferCohort);
2105 verify(mockTransferCohort, never()).transferComplete();
2106 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2107 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2109 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2110 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112 // Leader should force an election timeout
2113 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2115 verify(mockTransferCohort).transferComplete();
2119 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2120 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2122 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2123 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2124 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2126 leader = new Leader(leaderActorContext);
2127 leaderActorContext.setCurrentBehavior(leader);
2129 // Initial heartbeat
2130 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2131 MessageCollectorActor.clearMessages(followerActor);
2133 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2134 leader.transferLeadership(mockTransferCohort);
2136 verify(mockTransferCohort, never()).transferComplete();
2138 // Sync up the follower.
2139 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2141 MessageCollectorActor.clearMessages(followerActor);
2143 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2146 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2149 // Leader should force an election timeout
2150 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152 verify(mockTransferCohort).transferComplete();
2156 public void testTransferLeadershipWithFollowerSyncTimeout() {
2157 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2159 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2162 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2163 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2165 leader = new Leader(leaderActorContext);
2166 leaderActorContext.setCurrentBehavior(leader);
2168 // Initial heartbeat
2169 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2170 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2171 MessageCollectorActor.clearMessages(followerActor);
2173 sendReplicate(leaderActorContext, 0);
2174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2176 MessageCollectorActor.clearMessages(followerActor);
2178 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2179 leader.transferLeadership(mockTransferCohort);
2181 verify(mockTransferCohort, never()).transferComplete();
2183 // Send heartbeats to time out the transfer.
2184 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2185 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2186 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2187 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2190 verify(mockTransferCohort).abortTransfer();
2191 verify(mockTransferCohort, never()).transferComplete();
2192 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2196 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2197 ActorRef actorRef, RaftRPC rpc) throws Exception {
2198 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2199 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2202 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2204 private final long electionTimeOutIntervalMillis;
2205 private final int snapshotChunkSize;
2207 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2209 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2210 this.snapshotChunkSize = snapshotChunkSize;
2214 public FiniteDuration getElectionTimeOutInterval() {
2215 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2219 public int getSnapshotChunkSize() {
2220 return snapshotChunkSize;