2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.google.protobuf.ByteString;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
39 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
42 import org.opendaylight.controller.cluster.raft.RaftState;
43 import org.opendaylight.controller.cluster.raft.RaftVersions;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
71 static final String FOLLOWER_ID = "follower";
72 public static final String LEADER_ID = "leader";
74 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80 private Leader leader;
81 private final short payloadVersion = 5;
85 public void tearDown() throws Exception {
94 public void testHandleMessageForUnknownMessage() throws Exception {
95 logStart("testHandleMessageForUnknownMessage");
97 leader = new Leader(createActorContext());
99 // handle message should null when it receives an unknown message
100 assertNull(leader.handleMessage(followerActor, "foo"));
104 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107 MockRaftActorContext actorContext = createActorContextWithFollower();
108 actorContext.setCommitIndex(-1);
109 short payloadVersion = (short)5;
110 actorContext.setPayloadVersion(payloadVersion);
113 actorContext.getTermInformation().update(term, "");
115 leader = new Leader(actorContext);
116 actorContext.setCurrentBehavior(leader);
118 // Leader should send an immediate heartbeat with no entries as follower is inactive.
119 long lastIndex = actorContext.getReplicatedLog().lastIndex();
120 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121 assertEquals("getTerm", term, appendEntries.getTerm());
122 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124 assertEquals("Entries size", 0, appendEntries.getEntries().size());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127 // The follower would normally reply - simulate that explicitly here.
128 leader.handleMessage(followerActor, new AppendEntriesReply(
129 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
132 followerActor.underlyingActor().clear();
134 // Sleep for the heartbeat interval so AppendEntries is sent.
135 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
138 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
140 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143 assertEquals("Entries size", 1, appendEntries.getEntries().size());
144 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151 return sendReplicate(actorContext, 1, index);
154 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157 term, index, payload);
158 actorContext.getReplicatedLog().append(newEntry);
159 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
163 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
166 MockRaftActorContext actorContext = createActorContextWithFollower();
169 actorContext.getTermInformation().update(term, "");
171 leader = new Leader(actorContext);
173 // Leader will send an immediate heartbeat - ignore it.
174 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
176 // The follower would normally reply - simulate that explicitly here.
177 long lastIndex = actorContext.getReplicatedLog().lastIndex();
178 leader.handleMessage(followerActor, new AppendEntriesReply(
179 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
182 followerActor.underlyingActor().clear();
184 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
186 // State should not change
187 assertTrue(raftBehavior instanceof Leader);
189 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192 assertEquals("Entries size", 1, appendEntries.getEntries().size());
193 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
200 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
203 MockRaftActorContext actorContext = createActorContextWithFollower();
204 actorContext.setCommitIndex(-1);
206 // The raft context is initialized with a couple log entries. However the commitIndex
207 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208 // committed and applied. Now it regains leadership with a higher term (2).
209 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210 long newTerm = prevTerm + 1;
211 actorContext.getTermInformation().update(newTerm, "");
213 leader = new Leader(actorContext);
214 actorContext.setCurrentBehavior(leader);
216 // Leader will send an immediate heartbeat - ignore it.
217 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
219 // The follower replies with the leader's current last index and term, simulating that it is
220 // up to date with the leader.
221 long lastIndex = actorContext.getReplicatedLog().lastIndex();
222 leader.handleMessage(followerActor, new AppendEntriesReply(
223 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
225 // The commit index should not get updated even though consensus was reached. This is b/c the
226 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
227 // from previous terms by counting replicas".
228 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
230 followerActor.underlyingActor().clear();
232 // Now replicate a new entry with the new term 2.
233 long newIndex = lastIndex + 1;
234 sendReplicate(actorContext, newTerm, newIndex);
236 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239 assertEquals("Entries size", 1, appendEntries.getEntries().size());
240 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
244 // The follower replies with success. The leader should now update the commit index to the new index
245 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
246 // prior entries are committed indirectly".
247 leader.handleMessage(followerActor, new AppendEntriesReply(
248 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
250 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
254 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
257 MockRaftActorContext actorContext = createActorContextWithFollower();
258 actorContext.setRaftPolicy(createRaftPolicy(true, true));
261 actorContext.getTermInformation().update(term, "");
263 leader = new Leader(actorContext);
265 // Leader will send an immediate heartbeat - ignore it.
266 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
268 // The follower would normally reply - simulate that explicitly here.
269 long lastIndex = actorContext.getReplicatedLog().lastIndex();
270 leader.handleMessage(followerActor, new AppendEntriesReply(
271 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
274 followerActor.underlyingActor().clear();
276 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
278 // State should not change
279 assertTrue(raftBehavior instanceof Leader);
281 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284 assertEquals("Entries size", 1, appendEntries.getEntries().size());
285 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
292 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
295 MockRaftActorContext actorContext = createActorContextWithFollower();
296 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
298 public FiniteDuration getHeartBeatInterval() {
299 return FiniteDuration.apply(5, TimeUnit.SECONDS);
304 actorContext.getTermInformation().update(term, "");
306 leader = new Leader(actorContext);
308 // Leader will send an immediate heartbeat - ignore it.
309 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
311 // The follower would normally reply - simulate that explicitly here.
312 long lastIndex = actorContext.getReplicatedLog().lastIndex();
313 leader.handleMessage(followerActor, new AppendEntriesReply(
314 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
317 followerActor.underlyingActor().clear();
319 for(int i=0;i<5;i++) {
320 sendReplicate(actorContext, lastIndex+i+1);
323 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324 // We expect only 1 message to be sent because of two reasons,
325 // - an append entries reply was not received
326 // - the heartbeat interval has not expired
327 // In this scenario if multiple messages are sent they would likely be duplicates
328 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
332 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
335 MockRaftActorContext actorContext = createActorContextWithFollower();
336 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
338 public FiniteDuration getHeartBeatInterval() {
339 return FiniteDuration.apply(5, TimeUnit.SECONDS);
344 actorContext.getTermInformation().update(term, "");
346 leader = new Leader(actorContext);
348 // Leader will send an immediate heartbeat - ignore it.
349 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
351 // The follower would normally reply - simulate that explicitly here.
352 long lastIndex = actorContext.getReplicatedLog().lastIndex();
353 leader.handleMessage(followerActor, new AppendEntriesReply(
354 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
357 followerActor.underlyingActor().clear();
359 for(int i=0;i<3;i++) {
360 sendReplicate(actorContext, lastIndex+i+1);
361 leader.handleMessage(followerActor, new AppendEntriesReply(
362 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
366 for(int i=3;i<5;i++) {
367 sendReplicate(actorContext, lastIndex + i + 1);
370 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372 // get sent to the follower - but not the 5th
373 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
375 for(int i=0;i<4;i++) {
376 long expected = allMessages.get(i).getEntries().get(0).getIndex();
377 assertEquals(expected, i+2);
382 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
385 MockRaftActorContext actorContext = createActorContextWithFollower();
386 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
388 public FiniteDuration getHeartBeatInterval() {
389 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
394 actorContext.getTermInformation().update(term, "");
396 leader = new Leader(actorContext);
398 // Leader will send an immediate heartbeat - ignore it.
399 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
401 // The follower would normally reply - simulate that explicitly here.
402 long lastIndex = actorContext.getReplicatedLog().lastIndex();
403 leader.handleMessage(followerActor, new AppendEntriesReply(
404 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
407 followerActor.underlyingActor().clear();
409 sendReplicate(actorContext, lastIndex+1);
411 // Wait slightly longer than heartbeat duration
412 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
414 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
416 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
419 assertEquals(1, allMessages.get(0).getEntries().size());
420 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
421 assertEquals(1, allMessages.get(1).getEntries().size());
422 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
427 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
430 MockRaftActorContext actorContext = createActorContextWithFollower();
431 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
433 public FiniteDuration getHeartBeatInterval() {
434 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
439 actorContext.getTermInformation().update(term, "");
441 leader = new Leader(actorContext);
443 // Leader will send an immediate heartbeat - ignore it.
444 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446 // The follower would normally reply - simulate that explicitly here.
447 long lastIndex = actorContext.getReplicatedLog().lastIndex();
448 leader.handleMessage(followerActor, new AppendEntriesReply(
449 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
452 followerActor.underlyingActor().clear();
454 for(int i=0;i<3;i++) {
455 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
459 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
464 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
467 MockRaftActorContext actorContext = createActorContextWithFollower();
468 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470 public FiniteDuration getHeartBeatInterval() {
471 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
476 actorContext.getTermInformation().update(term, "");
478 leader = new Leader(actorContext);
480 // Leader will send an immediate heartbeat - ignore it.
481 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483 // The follower would normally reply - simulate that explicitly here.
484 long lastIndex = actorContext.getReplicatedLog().lastIndex();
485 leader.handleMessage(followerActor, new AppendEntriesReply(
486 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489 followerActor.underlyingActor().clear();
491 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493 sendReplicate(actorContext, lastIndex+1);
495 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
498 assertEquals(0, allMessages.get(0).getEntries().size());
499 assertEquals(1, allMessages.get(1).getEntries().size());
504 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
507 MockRaftActorContext actorContext = createActorContext();
509 leader = new Leader(actorContext);
511 actorContext.setLastApplied(0);
513 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514 long term = actorContext.getTermInformation().getCurrentTerm();
515 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
516 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
518 actorContext.getReplicatedLog().append(newEntry);
520 final Identifier id = new MockIdentifier("state-id");
521 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
523 // State should not change
524 assertTrue(raftBehavior instanceof Leader);
526 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
528 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
529 // one since lastApplied state is 0.
530 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
531 leaderActor, ApplyState.class);
532 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
534 for(int i = 0; i <= newLogIndex - 1; i++ ) {
535 ApplyState applyState = applyStateList.get(i);
536 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
537 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
540 ApplyState last = applyStateList.get((int) newLogIndex - 1);
541 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
542 assertEquals("getIdentifier", id, last.getIdentifier());
546 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
547 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
549 MockRaftActorContext actorContext = createActorContextWithFollower();
551 Map<String, String> leadersSnapshot = new HashMap<>();
552 leadersSnapshot.put("1", "A");
553 leadersSnapshot.put("2", "B");
554 leadersSnapshot.put("3", "C");
557 actorContext.getReplicatedLog().removeFrom(0);
559 final int commitIndex = 3;
560 final int snapshotIndex = 2;
561 final int newEntryIndex = 4;
562 final int snapshotTerm = 1;
563 final int currentTerm = 2;
565 // set the snapshot variables in replicatedlog
566 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
567 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
568 actorContext.setCommitIndex(commitIndex);
569 //set follower timeout to 2 mins, helps during debugging
570 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
572 leader = new Leader(actorContext);
574 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
575 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
578 ReplicatedLogImplEntry entry =
579 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580 new MockRaftActorContext.MockPayload("D"));
582 //update follower timestamp
583 leader.markFollowerActive(FOLLOWER_ID);
585 ByteString bs = toByteString(leadersSnapshot);
586 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
587 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
588 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
589 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
590 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
592 //send first chunk and no InstallSnapshotReply received yet
594 fts.incrementChunkIndex();
596 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
597 TimeUnit.MILLISECONDS);
599 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
601 AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
603 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
605 //InstallSnapshotReply received
606 fts.markSendStatus(true);
608 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
610 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
612 assertEquals(commitIndex, is.getLastIncludedIndex());
616 public void testSendAppendEntriesSnapshotScenario() throws Exception {
617 logStart("testSendAppendEntriesSnapshotScenario");
619 MockRaftActorContext actorContext = createActorContextWithFollower();
621 Map<String, String> leadersSnapshot = new HashMap<>();
622 leadersSnapshot.put("1", "A");
623 leadersSnapshot.put("2", "B");
624 leadersSnapshot.put("3", "C");
627 actorContext.getReplicatedLog().removeFrom(0);
629 final int followersLastIndex = 2;
630 final int snapshotIndex = 3;
631 final int newEntryIndex = 4;
632 final int snapshotTerm = 1;
633 final int currentTerm = 2;
635 // set the snapshot variables in replicatedlog
636 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638 actorContext.setCommitIndex(followersLastIndex);
640 leader = new Leader(actorContext);
642 // Leader will send an immediate heartbeat - ignore it.
643 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
646 ReplicatedLogImplEntry entry =
647 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
648 new MockRaftActorContext.MockPayload("D"));
650 actorContext.getReplicatedLog().append(entry);
652 //update follower timestamp
653 leader.markFollowerActive(FOLLOWER_ID);
655 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656 RaftActorBehavior raftBehavior = leader.handleMessage(
657 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
659 assertTrue(raftBehavior instanceof Leader);
661 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
665 public void testInitiateInstallSnapshot() throws Exception {
666 logStart("testInitiateInstallSnapshot");
668 MockRaftActorContext actorContext = createActorContextWithFollower();
671 actorContext.getReplicatedLog().removeFrom(0);
673 final int followersLastIndex = 2;
674 final int snapshotIndex = 3;
675 final int newEntryIndex = 4;
676 final int snapshotTerm = 1;
677 final int currentTerm = 2;
679 // set the snapshot variables in replicatedlog
680 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682 actorContext.setLastApplied(3);
683 actorContext.setCommitIndex(followersLastIndex);
685 leader = new Leader(actorContext);
687 // Leader will send an immediate heartbeat - ignore it.
688 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
690 // set the snapshot as absent and check if capture-snapshot is invoked.
691 leader.setSnapshot(null);
694 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695 new MockRaftActorContext.MockPayload("D"));
697 actorContext.getReplicatedLog().append(entry);
699 //update follower timestamp
700 leader.markFollowerActive(FOLLOWER_ID);
702 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
704 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
706 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
708 assertTrue(cs.isInstallSnapshotInitiated());
709 assertEquals(3, cs.getLastAppliedIndex());
710 assertEquals(1, cs.getLastAppliedTerm());
711 assertEquals(4, cs.getLastIndex());
712 assertEquals(2, cs.getLastTerm());
714 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
715 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
717 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
721 public void testInitiateForceInstallSnapshot() throws Exception {
722 logStart("testInitiateForceInstallSnapshot");
724 MockRaftActorContext actorContext = createActorContextWithFollower();
726 final int followersLastIndex = 2;
727 final int snapshotIndex = -1;
728 final int newEntryIndex = 4;
729 final int snapshotTerm = -1;
730 final int currentTerm = 2;
732 // set the snapshot variables in replicatedlog
733 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735 actorContext.setLastApplied(3);
736 actorContext.setCommitIndex(followersLastIndex);
738 actorContext.getReplicatedLog().removeFrom(0);
740 leader = new Leader(actorContext);
742 // Leader will send an immediate heartbeat - ignore it.
743 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
745 // set the snapshot as absent and check if capture-snapshot is invoked.
746 leader.setSnapshot(null);
748 for(int i=0;i<4;i++) {
749 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
750 new MockRaftActorContext.MockPayload("X" + i)));
754 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
755 new MockRaftActorContext.MockPayload("D"));
757 actorContext.getReplicatedLog().append(entry);
759 //update follower timestamp
760 leader.markFollowerActive(FOLLOWER_ID);
762 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763 // installed with a SendInstallSnapshot
764 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
766 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
768 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
770 assertTrue(cs.isInstallSnapshotInitiated());
771 assertEquals(3, cs.getLastAppliedIndex());
772 assertEquals(1, cs.getLastAppliedTerm());
773 assertEquals(4, cs.getLastIndex());
774 assertEquals(2, cs.getLastTerm());
776 // if an initiate is started again when first is in progress, it should not initiate Capture
777 leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
779 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
784 public void testInstallSnapshot() throws Exception {
785 logStart("testInstallSnapshot");
787 MockRaftActorContext actorContext = createActorContextWithFollower();
789 Map<String, String> leadersSnapshot = new HashMap<>();
790 leadersSnapshot.put("1", "A");
791 leadersSnapshot.put("2", "B");
792 leadersSnapshot.put("3", "C");
795 actorContext.getReplicatedLog().removeFrom(0);
797 final int lastAppliedIndex = 3;
798 final int snapshotIndex = 2;
799 final int snapshotTerm = 1;
800 final int currentTerm = 2;
802 // set the snapshot variables in replicatedlog
803 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806 actorContext.setCommitIndex(lastAppliedIndex);
807 actorContext.setLastApplied(lastAppliedIndex);
809 leader = new Leader(actorContext);
811 // Initial heartbeat.
812 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
814 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
815 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
817 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
818 Collections.<ReplicatedLogEntry>emptyList(),
819 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
821 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
823 assertTrue(raftBehavior instanceof Leader);
825 // check if installsnapshot gets called with the correct values.
827 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
829 assertNotNull(installSnapshot.getData());
830 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
833 assertEquals(currentTerm, installSnapshot.getTerm());
837 public void testForceInstallSnapshot() throws Exception {
838 logStart("testForceInstallSnapshot");
840 MockRaftActorContext actorContext = createActorContextWithFollower();
842 Map<String, String> leadersSnapshot = new HashMap<>();
843 leadersSnapshot.put("1", "A");
844 leadersSnapshot.put("2", "B");
845 leadersSnapshot.put("3", "C");
847 final int lastAppliedIndex = 3;
848 final int snapshotIndex = -1;
849 final int snapshotTerm = -1;
850 final int currentTerm = 2;
852 // set the snapshot variables in replicatedlog
853 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856 actorContext.setCommitIndex(lastAppliedIndex);
857 actorContext.setLastApplied(lastAppliedIndex);
859 leader = new Leader(actorContext);
861 // Initial heartbeat.
862 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
867 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
868 Collections.<ReplicatedLogEntry>emptyList(),
869 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
871 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
873 assertTrue(raftBehavior instanceof Leader);
875 // check if installsnapshot gets called with the correct values.
877 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
879 assertNotNull(installSnapshot.getData());
880 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
881 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
883 assertEquals(currentTerm, installSnapshot.getTerm());
887 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
888 logStart("testHandleInstallSnapshotReplyLastChunk");
890 MockRaftActorContext actorContext = createActorContextWithFollower();
892 final int commitIndex = 3;
893 final int snapshotIndex = 2;
894 final int snapshotTerm = 1;
895 final int currentTerm = 2;
897 actorContext.setCommitIndex(commitIndex);
899 leader = new Leader(actorContext);
900 actorContext.setCurrentBehavior(leader);
902 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
903 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
905 // Ignore initial heartbeat.
906 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
908 Map<String, String> leadersSnapshot = new HashMap<>();
909 leadersSnapshot.put("1", "A");
910 leadersSnapshot.put("2", "B");
911 leadersSnapshot.put("3", "C");
913 // set the snapshot variables in replicatedlog
915 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
916 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
917 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
919 ByteString bs = toByteString(leadersSnapshot);
920 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
921 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
922 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
923 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
924 leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
925 while(!fts.isLastChunk(fts.getChunkIndex())) {
927 fts.incrementChunkIndex();
931 actorContext.getReplicatedLog().removeFrom(0);
933 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
934 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
936 assertTrue(raftBehavior instanceof Leader);
938 assertEquals(1, leader.followerLogSize());
939 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
941 assertNull(fli.getInstallSnapshotState());
942 assertEquals(commitIndex, fli.getMatchIndex());
943 assertEquals(commitIndex + 1, fli.getNextIndex());
944 assertFalse(leader.hasSnapshot());
948 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
949 logStart("testSendSnapshotfromInstallSnapshotReply");
951 MockRaftActorContext actorContext = createActorContextWithFollower();
953 final int commitIndex = 3;
954 final int snapshotIndex = 2;
955 final int snapshotTerm = 1;
956 final int currentTerm = 2;
958 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
960 public int getSnapshotChunkSize() {
964 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
965 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
967 actorContext.setConfigParams(configParams);
968 actorContext.setCommitIndex(commitIndex);
970 leader = new Leader(actorContext);
971 actorContext.setCurrentBehavior(leader);
973 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
974 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
976 Map<String, String> leadersSnapshot = new HashMap<>();
977 leadersSnapshot.put("1", "A");
978 leadersSnapshot.put("2", "B");
979 leadersSnapshot.put("3", "C");
981 // set the snapshot variables in replicatedlog
982 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
983 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
984 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
986 ByteString bs = toByteString(leadersSnapshot);
987 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
988 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
989 leader.setSnapshot(snapshot);
991 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
993 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
995 assertEquals(1, installSnapshot.getChunkIndex());
996 assertEquals(3, installSnapshot.getTotalChunks());
998 followerActor.underlyingActor().clear();
999 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1000 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1002 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1004 assertEquals(2, installSnapshot.getChunkIndex());
1005 assertEquals(3, installSnapshot.getTotalChunks());
1007 followerActor.underlyingActor().clear();
1008 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1011 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1013 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1014 followerActor.underlyingActor().clear();
1015 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1016 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1018 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1020 assertNull(installSnapshot);
1025 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1026 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1028 MockRaftActorContext actorContext = createActorContextWithFollower();
1030 final int commitIndex = 3;
1031 final int snapshotIndex = 2;
1032 final int snapshotTerm = 1;
1033 final int currentTerm = 2;
1035 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1037 public int getSnapshotChunkSize() {
1042 actorContext.setCommitIndex(commitIndex);
1044 leader = new Leader(actorContext);
1046 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1049 Map<String, String> leadersSnapshot = new HashMap<>();
1050 leadersSnapshot.put("1", "A");
1051 leadersSnapshot.put("2", "B");
1052 leadersSnapshot.put("3", "C");
1054 // set the snapshot variables in replicatedlog
1055 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1059 ByteString bs = toByteString(leadersSnapshot);
1060 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062 leader.setSnapshot(snapshot);
1064 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1065 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1067 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069 assertEquals(1, installSnapshot.getChunkIndex());
1070 assertEquals(3, installSnapshot.getTotalChunks());
1072 followerActor.underlyingActor().clear();
1074 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075 FOLLOWER_ID, -1, false));
1077 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1078 TimeUnit.MILLISECONDS);
1080 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1082 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1084 assertEquals(1, installSnapshot.getChunkIndex());
1085 assertEquals(3, installSnapshot.getTotalChunks());
1089 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1090 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1092 MockRaftActorContext actorContext = createActorContextWithFollower();
1094 final int commitIndex = 3;
1095 final int snapshotIndex = 2;
1096 final int snapshotTerm = 1;
1097 final int currentTerm = 2;
1099 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1101 public int getSnapshotChunkSize() {
1106 actorContext.setCommitIndex(commitIndex);
1108 leader = new Leader(actorContext);
1110 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1111 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1113 Map<String, String> leadersSnapshot = new HashMap<>();
1114 leadersSnapshot.put("1", "A");
1115 leadersSnapshot.put("2", "B");
1116 leadersSnapshot.put("3", "C");
1118 // set the snapshot variables in replicatedlog
1119 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1120 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1121 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1123 ByteString bs = toByteString(leadersSnapshot);
1124 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1125 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1126 leader.setSnapshot(snapshot);
1128 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1130 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1132 assertEquals(1, installSnapshot.getChunkIndex());
1133 assertEquals(3, installSnapshot.getTotalChunks());
1134 assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1135 installSnapshot.getLastChunkHashCode().get().intValue());
1137 int hashCode = Arrays.hashCode(installSnapshot.getData());
1139 followerActor.underlyingActor().clear();
1141 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1142 FOLLOWER_ID, 1, true));
1144 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1146 assertEquals(2, installSnapshot.getChunkIndex());
1147 assertEquals(3, installSnapshot.getTotalChunks());
1148 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1152 public void testLeaderInstallSnapshotState() {
1153 logStart("testLeaderInstallSnapshotState");
1155 Map<String, String> leadersSnapshot = new HashMap<>();
1156 leadersSnapshot.put("1", "A");
1157 leadersSnapshot.put("2", "B");
1158 leadersSnapshot.put("3", "C");
1160 ByteString bs = toByteString(leadersSnapshot);
1161 byte[] barray = bs.toByteArray();
1163 LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
1165 assertEquals(bs.size(), barray.length);
1168 for (int i=0; i < barray.length; i = i + 50) {
1172 if (i + 50 > barray.length) {
1176 byte[] chunk = fts.getNextChunk();
1177 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1178 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1180 fts.markSendStatus(true);
1181 if (!fts.isLastChunk(chunkIndex)) {
1182 fts.incrementChunkIndex();
1186 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1190 protected Leader createBehavior(final RaftActorContext actorContext) {
1191 return new Leader(actorContext);
1195 protected MockRaftActorContext createActorContext() {
1196 return createActorContext(leaderActor);
1200 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1201 return createActorContext(LEADER_ID, actorRef);
1204 private MockRaftActorContext createActorContextWithFollower() {
1205 MockRaftActorContext actorContext = createActorContext();
1206 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1207 followerActor.path().toString()).build());
1208 return actorContext;
1211 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1212 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1213 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1214 configParams.setElectionTimeoutFactor(100000);
1215 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1216 context.setConfigParams(configParams);
1217 context.setPayloadVersion(payloadVersion);
1221 private MockRaftActorContext createFollowerActorContextWithLeader() {
1222 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1223 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1224 followerConfig.setElectionTimeoutFactor(10000);
1225 followerActorContext.setConfigParams(followerConfig);
1226 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1227 return followerActorContext;
1231 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1232 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1234 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1236 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1238 Follower follower = new Follower(followerActorContext);
1239 followerActor.underlyingActor().setBehavior(follower);
1240 followerActorContext.setCurrentBehavior(follower);
1242 Map<String, String> peerAddresses = new HashMap<>();
1243 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1245 leaderActorContext.setPeerAddresses(peerAddresses);
1247 leaderActorContext.getReplicatedLog().removeFrom(0);
1250 leaderActorContext.setReplicatedLog(
1251 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1253 leaderActorContext.setCommitIndex(1);
1255 followerActorContext.getReplicatedLog().removeFrom(0);
1257 // follower too has the exact same log entries and has the same commit index
1258 followerActorContext.setReplicatedLog(
1259 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1261 followerActorContext.setCommitIndex(1);
1263 leader = new Leader(leaderActorContext);
1264 leaderActorContext.setCurrentBehavior(leader);
1266 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1268 assertEquals(1, appendEntries.getLeaderCommit());
1269 assertEquals(0, appendEntries.getEntries().size());
1270 assertEquals(0, appendEntries.getPrevLogIndex());
1272 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1273 leaderActor, AppendEntriesReply.class);
1275 assertEquals(2, appendEntriesReply.getLogLastIndex());
1276 assertEquals(1, appendEntriesReply.getLogLastTerm());
1278 // follower returns its next index
1279 assertEquals(2, appendEntriesReply.getLogLastIndex());
1280 assertEquals(1, appendEntriesReply.getLogLastTerm());
1286 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1287 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1289 MockRaftActorContext leaderActorContext = createActorContext();
1291 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1294 Follower follower = new Follower(followerActorContext);
1295 followerActor.underlyingActor().setBehavior(follower);
1296 followerActorContext.setCurrentBehavior(follower);
1298 Map<String, String> leaderPeerAddresses = new HashMap<>();
1299 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1301 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1303 leaderActorContext.getReplicatedLog().removeFrom(0);
1305 leaderActorContext.setReplicatedLog(
1306 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308 leaderActorContext.setCommitIndex(1);
1310 followerActorContext.getReplicatedLog().removeFrom(0);
1312 followerActorContext.setReplicatedLog(
1313 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315 // follower has the same log entries but its commit index > leaders commit index
1316 followerActorContext.setCommitIndex(2);
1318 leader = new Leader(leaderActorContext);
1320 // Initial heartbeat
1321 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1323 assertEquals(1, appendEntries.getLeaderCommit());
1324 assertEquals(0, appendEntries.getEntries().size());
1325 assertEquals(0, appendEntries.getPrevLogIndex());
1327 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1328 leaderActor, AppendEntriesReply.class);
1330 assertEquals(2, appendEntriesReply.getLogLastIndex());
1331 assertEquals(1, appendEntriesReply.getLogLastTerm());
1333 leaderActor.underlyingActor().setBehavior(follower);
1334 leader.handleMessage(followerActor, appendEntriesReply);
1336 leaderActor.underlyingActor().clear();
1337 followerActor.underlyingActor().clear();
1339 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1340 TimeUnit.MILLISECONDS);
1342 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1344 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1346 assertEquals(2, appendEntries.getLeaderCommit());
1347 assertEquals(0, appendEntries.getEntries().size());
1348 assertEquals(2, appendEntries.getPrevLogIndex());
1350 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1352 assertEquals(2, appendEntriesReply.getLogLastIndex());
1353 assertEquals(1, appendEntriesReply.getLogLastTerm());
1355 assertEquals(2, followerActorContext.getCommitIndex());
1361 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1362 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1364 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1365 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1366 new FiniteDuration(1000, TimeUnit.SECONDS));
1368 leaderActorContext.setReplicatedLog(
1369 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1370 long leaderCommitIndex = 2;
1371 leaderActorContext.setCommitIndex(leaderCommitIndex);
1372 leaderActorContext.setLastApplied(leaderCommitIndex);
1374 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1375 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1377 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1379 followerActorContext.setReplicatedLog(
1380 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1381 followerActorContext.setCommitIndex(0);
1382 followerActorContext.setLastApplied(0);
1384 Follower follower = new Follower(followerActorContext);
1385 followerActor.underlyingActor().setBehavior(follower);
1387 leader = new Leader(leaderActorContext);
1389 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1390 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1392 MessageCollectorActor.clearMessages(followerActor);
1393 MessageCollectorActor.clearMessages(leaderActor);
1395 // Verify initial AppendEntries sent with the leader's current commit index.
1396 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1397 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1398 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1400 leaderActor.underlyingActor().setBehavior(leader);
1402 leader.handleMessage(followerActor, appendEntriesReply);
1404 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1405 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1407 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1409 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1411 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1412 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1413 appendEntries.getEntries().get(0).getData());
1414 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1415 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1416 appendEntries.getEntries().get(1).getData());
1418 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1419 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1421 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1423 ApplyState applyState = applyStateList.get(0);
1424 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1425 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1426 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1427 applyState.getReplicatedLogEntry().getData());
1429 applyState = applyStateList.get(1);
1430 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1431 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1432 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1433 applyState.getReplicatedLogEntry().getData());
1435 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1436 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1440 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1441 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1443 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1444 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1445 new FiniteDuration(1000, TimeUnit.SECONDS));
1447 leaderActorContext.setReplicatedLog(
1448 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1449 long leaderCommitIndex = 1;
1450 leaderActorContext.setCommitIndex(leaderCommitIndex);
1451 leaderActorContext.setLastApplied(leaderCommitIndex);
1453 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1454 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1456 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1458 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1459 followerActorContext.setCommitIndex(-1);
1460 followerActorContext.setLastApplied(-1);
1462 Follower follower = new Follower(followerActorContext);
1463 followerActor.underlyingActor().setBehavior(follower);
1464 followerActorContext.setCurrentBehavior(follower);
1466 leader = new Leader(leaderActorContext);
1468 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1469 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1471 MessageCollectorActor.clearMessages(followerActor);
1472 MessageCollectorActor.clearMessages(leaderActor);
1474 // Verify initial AppendEntries sent with the leader's current commit index.
1475 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1476 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1477 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479 leaderActor.underlyingActor().setBehavior(leader);
1480 leaderActorContext.setCurrentBehavior(leader);
1482 leader.handleMessage(followerActor, appendEntriesReply);
1484 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1485 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1487 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1488 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1489 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1491 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1492 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1493 appendEntries.getEntries().get(0).getData());
1494 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1495 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1496 appendEntries.getEntries().get(1).getData());
1498 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1499 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1501 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1503 ApplyState applyState = applyStateList.get(0);
1504 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1505 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1506 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1507 applyState.getReplicatedLogEntry().getData());
1509 applyState = applyStateList.get(1);
1510 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1511 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1512 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1513 applyState.getReplicatedLogEntry().getData());
1515 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1516 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1520 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1521 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1523 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1524 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1525 new FiniteDuration(1000, TimeUnit.SECONDS));
1527 leaderActorContext.setReplicatedLog(
1528 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1529 long leaderCommitIndex = 1;
1530 leaderActorContext.setCommitIndex(leaderCommitIndex);
1531 leaderActorContext.setLastApplied(leaderCommitIndex);
1533 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1534 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1536 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1538 followerActorContext.setReplicatedLog(
1539 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1540 followerActorContext.setCommitIndex(-1);
1541 followerActorContext.setLastApplied(-1);
1543 Follower follower = new Follower(followerActorContext);
1544 followerActor.underlyingActor().setBehavior(follower);
1545 followerActorContext.setCurrentBehavior(follower);
1547 leader = new Leader(leaderActorContext);
1549 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1550 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1552 MessageCollectorActor.clearMessages(followerActor);
1553 MessageCollectorActor.clearMessages(leaderActor);
1555 // Verify initial AppendEntries sent with the leader's current commit index.
1556 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1557 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1558 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1560 leaderActor.underlyingActor().setBehavior(leader);
1561 leaderActorContext.setCurrentBehavior(leader);
1563 leader.handleMessage(followerActor, appendEntriesReply);
1565 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1566 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1568 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1569 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1570 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1572 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1573 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1574 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1575 appendEntries.getEntries().get(0).getData());
1576 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1577 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1578 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1579 appendEntries.getEntries().get(1).getData());
1581 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1582 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1584 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1586 ApplyState applyState = applyStateList.get(0);
1587 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1588 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1589 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1590 applyState.getReplicatedLogEntry().getData());
1592 applyState = applyStateList.get(1);
1593 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1594 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1595 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1596 applyState.getReplicatedLogEntry().getData());
1598 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1599 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1600 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1604 public void testHandleAppendEntriesReplyWithNewerTerm(){
1605 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1607 MockRaftActorContext leaderActorContext = createActorContext();
1608 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1609 new FiniteDuration(10000, TimeUnit.SECONDS));
1611 leaderActorContext.setReplicatedLog(
1612 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1614 leader = new Leader(leaderActorContext);
1615 leaderActor.underlyingActor().setBehavior(leader);
1616 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1618 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1620 assertEquals(false, appendEntriesReply.isSuccess());
1621 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1623 MessageCollectorActor.clearMessages(leaderActor);
1627 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1628 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1630 MockRaftActorContext leaderActorContext = createActorContext();
1631 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1632 new FiniteDuration(10000, TimeUnit.SECONDS));
1634 leaderActorContext.setReplicatedLog(
1635 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1636 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1638 leader = new Leader(leaderActorContext);
1639 leaderActor.underlyingActor().setBehavior(leader);
1640 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1642 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1644 assertEquals(false, appendEntriesReply.isSuccess());
1645 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1647 MessageCollectorActor.clearMessages(leaderActor);
1651 public void testHandleAppendEntriesReplySuccess() throws Exception {
1652 logStart("testHandleAppendEntriesReplySuccess");
1654 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1656 leaderActorContext.setReplicatedLog(
1657 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1659 leaderActorContext.setCommitIndex(1);
1660 leaderActorContext.setLastApplied(1);
1661 leaderActorContext.getTermInformation().update(1, "leader");
1663 leader = new Leader(leaderActorContext);
1665 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1667 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1668 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1670 short payloadVersion = 5;
1671 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1673 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1675 assertEquals(RaftState.Leader, raftActorBehavior.state());
1677 assertEquals(2, leaderActorContext.getCommitIndex());
1679 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1680 leaderActor, ApplyJournalEntries.class);
1682 assertEquals(2, leaderActorContext.getLastApplied());
1684 assertEquals(2, applyJournalEntries.getToIndex());
1686 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1689 assertEquals(1,applyStateList.size());
1691 ApplyState applyState = applyStateList.get(0);
1693 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1695 assertEquals(2, followerInfo.getMatchIndex());
1696 assertEquals(3, followerInfo.getNextIndex());
1697 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1698 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1702 public void testHandleAppendEntriesReplyUnknownFollower(){
1703 logStart("testHandleAppendEntriesReplyUnknownFollower");
1705 MockRaftActorContext leaderActorContext = createActorContext();
1707 leader = new Leader(leaderActorContext);
1709 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1711 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1713 assertEquals(RaftState.Leader, raftActorBehavior.state());
1717 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1718 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1720 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1721 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1722 new FiniteDuration(1000, TimeUnit.SECONDS));
1723 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1725 leaderActorContext.setReplicatedLog(
1726 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1727 long leaderCommitIndex = 3;
1728 leaderActorContext.setCommitIndex(leaderCommitIndex);
1729 leaderActorContext.setLastApplied(leaderCommitIndex);
1731 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1732 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1733 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1734 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1736 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1738 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1739 followerActorContext.setCommitIndex(-1);
1740 followerActorContext.setLastApplied(-1);
1742 Follower follower = new Follower(followerActorContext);
1743 followerActor.underlyingActor().setBehavior(follower);
1744 followerActorContext.setCurrentBehavior(follower);
1746 leader = new Leader(leaderActorContext);
1748 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1749 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1751 MessageCollectorActor.clearMessages(followerActor);
1752 MessageCollectorActor.clearMessages(leaderActor);
1754 // Verify initial AppendEntries sent with the leader's current commit index.
1755 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1756 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1757 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1759 leaderActor.underlyingActor().setBehavior(leader);
1760 leaderActorContext.setCurrentBehavior(leader);
1762 leader.handleMessage(followerActor, appendEntriesReply);
1764 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1765 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1767 appendEntries = appendEntriesList.get(0);
1768 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1769 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1770 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1772 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1773 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1774 appendEntries.getEntries().get(0).getData());
1775 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1776 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1777 appendEntries.getEntries().get(1).getData());
1779 appendEntries = appendEntriesList.get(1);
1780 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1781 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1782 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1784 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1785 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1786 appendEntries.getEntries().get(0).getData());
1787 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1788 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1789 appendEntries.getEntries().get(1).getData());
1791 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1792 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1794 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1796 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1797 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1801 public void testHandleRequestVoteReply(){
1802 logStart("testHandleRequestVoteReply");
1804 MockRaftActorContext leaderActorContext = createActorContext();
1806 leader = new Leader(leaderActorContext);
1808 // Should be a no-op.
1809 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1810 new RequestVoteReply(1, true));
1812 assertEquals(RaftState.Leader, raftActorBehavior.state());
1814 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1816 assertEquals(RaftState.Leader, raftActorBehavior.state());
1820 public void testIsolatedLeaderCheckNoFollowers() {
1821 logStart("testIsolatedLeaderCheckNoFollowers");
1823 MockRaftActorContext leaderActorContext = createActorContext();
1825 leader = new Leader(leaderActorContext);
1826 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1827 assertTrue(behavior instanceof Leader);
1831 public void testIsolatedLeaderCheckNoVotingFollowers() {
1832 logStart("testIsolatedLeaderCheckNoVotingFollowers");
1834 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1835 Follower follower = new Follower(followerActorContext);
1836 followerActor.underlyingActor().setBehavior(follower);
1838 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1839 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1840 new FiniteDuration(1000, TimeUnit.SECONDS));
1841 leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1843 leader = new Leader(leaderActorContext);
1844 leader.getFollower(FOLLOWER_ID).markFollowerActive();
1845 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1846 assertTrue("Expected Leader", behavior instanceof Leader);
1849 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1850 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1851 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1853 MockRaftActorContext leaderActorContext = createActorContext();
1855 Map<String, String> peerAddresses = new HashMap<>();
1856 peerAddresses.put("follower-1", followerActor1.path().toString());
1857 peerAddresses.put("follower-2", followerActor2.path().toString());
1859 leaderActorContext.setPeerAddresses(peerAddresses);
1860 leaderActorContext.setRaftPolicy(raftPolicy);
1862 leader = new Leader(leaderActorContext);
1864 leader.markFollowerActive("follower-1");
1865 leader.markFollowerActive("follower-2");
1866 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1867 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1869 // kill 1 follower and verify if that got killed
1870 final JavaTestKit probe = new JavaTestKit(getSystem());
1871 probe.watch(followerActor1);
1872 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1873 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1874 assertEquals(termMsg1.getActor(), followerActor1);
1876 leader.markFollowerInActive("follower-1");
1877 leader.markFollowerActive("follower-2");
1878 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1879 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1881 // kill 2nd follower and leader should change to Isolated leader
1882 followerActor2.tell(PoisonPill.getInstance(), null);
1883 probe.watch(followerActor2);
1884 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1885 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1886 assertEquals(termMsg2.getActor(), followerActor2);
1888 leader.markFollowerInActive("follower-2");
1889 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1893 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1894 logStart("testIsolatedLeaderCheckTwoFollowers");
1896 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1898 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1899 behavior instanceof IsolatedLeader);
1903 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1904 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1906 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1908 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1909 behavior instanceof Leader);
1913 public void testLaggingFollowerStarvation() throws Exception {
1914 logStart("testLaggingFollowerStarvation");
1915 new JavaTestKit(getSystem()) {{
1916 String leaderActorId = actorFactory.generateActorId("leader");
1917 String follower1ActorId = actorFactory.generateActorId("follower");
1918 String follower2ActorId = actorFactory.generateActorId("follower");
1920 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1921 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1922 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1923 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1925 MockRaftActorContext leaderActorContext =
1926 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1928 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1929 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1930 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1932 leaderActorContext.setConfigParams(configParams);
1934 leaderActorContext.setReplicatedLog(
1935 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1937 Map<String, String> peerAddresses = new HashMap<>();
1938 peerAddresses.put(follower1ActorId,
1939 follower1Actor.path().toString());
1940 peerAddresses.put(follower2ActorId,
1941 follower2Actor.path().toString());
1943 leaderActorContext.setPeerAddresses(peerAddresses);
1944 leaderActorContext.getTermInformation().update(1, leaderActorId);
1946 RaftActorBehavior leader = createBehavior(leaderActorContext);
1948 leaderActor.underlyingActor().setBehavior(leader);
1950 for(int i=1;i<6;i++) {
1951 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1952 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1953 assertTrue(newBehavior == leader);
1954 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1957 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1958 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1960 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1961 heartbeats.size() > 1);
1963 // Check if follower-2 got AppendEntries during this time and was not starved
1964 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1966 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1967 appendEntries.size() > 1);
1973 public void testReplicationConsensusWithNonVotingFollower() {
1974 logStart("testReplicationConsensusWithNonVotingFollower");
1976 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1977 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1978 new FiniteDuration(1000, TimeUnit.SECONDS));
1980 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1981 leaderActorContext.setCommitIndex(-1);
1983 String nonVotingFollowerId = "nonvoting-follower";
1984 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1985 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1987 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1989 leader = new Leader(leaderActorContext);
1990 leaderActorContext.setCurrentBehavior(leader);
1992 // Ignore initial heartbeats
1993 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1994 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1996 MessageCollectorActor.clearMessages(followerActor);
1997 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1998 MessageCollectorActor.clearMessages(leaderActor);
2000 // Send a Replicate message and wait for AppendEntries.
2001 sendReplicate(leaderActorContext, 0);
2003 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2004 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2006 // Send reply only from the voting follower and verify consensus via ApplyState.
2007 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2009 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2011 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2013 MessageCollectorActor.clearMessages(followerActor);
2014 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2015 MessageCollectorActor.clearMessages(leaderActor);
2017 // Send another Replicate message
2018 sendReplicate(leaderActorContext, 1);
2020 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2021 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2022 AppendEntries.class);
2023 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2024 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2026 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2027 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2029 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2031 // Send reply from the voting follower and verify consensus.
2032 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2034 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2038 public void testTransferLeadershipWithFollowerInSync() {
2039 logStart("testTransferLeadershipWithFollowerInSync");
2041 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2042 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2043 new FiniteDuration(1000, TimeUnit.SECONDS));
2044 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2046 leader = new Leader(leaderActorContext);
2047 leaderActorContext.setCurrentBehavior(leader);
2049 // Initial heartbeat
2050 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2051 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2052 MessageCollectorActor.clearMessages(followerActor);
2054 sendReplicate(leaderActorContext, 0);
2055 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2057 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2058 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2059 MessageCollectorActor.clearMessages(followerActor);
2061 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2062 leader.transferLeadership(mockTransferCohort);
2064 verify(mockTransferCohort, never()).transferComplete();
2065 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2068 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2069 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2071 // Leader should force an election timeout
2072 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2074 verify(mockTransferCohort).transferComplete();
2078 public void testTransferLeadershipWithEmptyLog() {
2079 logStart("testTransferLeadershipWithEmptyLog");
2081 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2082 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2083 new FiniteDuration(1000, TimeUnit.SECONDS));
2084 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2086 leader = new Leader(leaderActorContext);
2087 leaderActorContext.setCurrentBehavior(leader);
2089 // Initial heartbeat
2090 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2091 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2092 MessageCollectorActor.clearMessages(followerActor);
2094 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2095 leader.transferLeadership(mockTransferCohort);
2097 verify(mockTransferCohort, never()).transferComplete();
2098 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2101 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2102 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2104 // Leader should force an election timeout
2105 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2107 verify(mockTransferCohort).transferComplete();
2111 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2112 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2114 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2115 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2116 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2118 leader = new Leader(leaderActorContext);
2119 leaderActorContext.setCurrentBehavior(leader);
2121 // Initial heartbeat
2122 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2123 MessageCollectorActor.clearMessages(followerActor);
2125 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2126 leader.transferLeadership(mockTransferCohort);
2128 verify(mockTransferCohort, never()).transferComplete();
2130 // Sync up the follower.
2131 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2133 MessageCollectorActor.clearMessages(followerActor);
2135 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2136 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2137 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2138 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2139 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2141 // Leader should force an election timeout
2142 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2144 verify(mockTransferCohort).transferComplete();
2148 public void testTransferLeadershipWithFollowerSyncTimeout() {
2149 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2151 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2152 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2153 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2154 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2155 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2157 leader = new Leader(leaderActorContext);
2158 leaderActorContext.setCurrentBehavior(leader);
2160 // Initial heartbeat
2161 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2162 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2163 MessageCollectorActor.clearMessages(followerActor);
2165 sendReplicate(leaderActorContext, 0);
2166 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2168 MessageCollectorActor.clearMessages(followerActor);
2170 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2171 leader.transferLeadership(mockTransferCohort);
2173 verify(mockTransferCohort, never()).transferComplete();
2175 // Send heartbeats to time out the transfer.
2176 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2177 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2178 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2179 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2182 verify(mockTransferCohort).abortTransfer();
2183 verify(mockTransferCohort, never()).transferComplete();
2184 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2188 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2189 ActorRef actorRef, RaftRPC rpc) throws Exception {
2190 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2191 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2194 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2196 private final long electionTimeOutIntervalMillis;
2197 private final int snapshotChunkSize;
2199 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2201 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2202 this.snapshotChunkSize = snapshotChunkSize;
2206 public FiniteDuration getElectionTimeOutInterval() {
2207 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2211 public int getSnapshotChunkSize() {
2212 return snapshotChunkSize;