2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
68 public class LeaderTest extends AbstractLeaderTest<Leader> {
70 static final String FOLLOWER_ID = "follower";
71 public static final String LEADER_ID = "leader";
73 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79 private Leader leader;
80 private final short payloadVersion = 5;
84 public void tearDown() throws Exception {
93 public void testHandleMessageForUnknownMessage() throws Exception {
94 logStart("testHandleMessageForUnknownMessage");
96 leader = new Leader(createActorContext());
98 // handle message should null when it receives an unknown message
99 assertNull(leader.handleMessage(followerActor, "foo"));
103 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106 MockRaftActorContext actorContext = createActorContextWithFollower();
107 short payloadVersion = (short)5;
108 actorContext.setPayloadVersion(payloadVersion);
111 actorContext.getTermInformation().update(term, "");
113 leader = new Leader(actorContext);
114 actorContext.setCurrentBehavior(leader);
116 // Leader should send an immediate heartbeat with no entries as follower is inactive.
117 long lastIndex = actorContext.getReplicatedLog().lastIndex();
118 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119 assertEquals("getTerm", term, appendEntries.getTerm());
120 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122 assertEquals("Entries size", 0, appendEntries.getEntries().size());
123 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125 // The follower would normally reply - simulate that explicitly here.
126 leader.handleMessage(followerActor, new AppendEntriesReply(
127 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
130 followerActor.underlyingActor().clear();
132 // Sleep for the heartbeat interval so AppendEntries is sent.
133 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
136 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
138 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141 assertEquals("Entries size", 1, appendEntries.getEntries().size());
142 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
148 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149 return sendReplicate(actorContext, 1, index);
152 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
153 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155 term, index, payload);
156 actorContext.getReplicatedLog().append(newEntry);
157 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
161 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
162 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
164 MockRaftActorContext actorContext = createActorContextWithFollower();
167 actorContext.getTermInformation().update(term, "");
169 leader = new Leader(actorContext);
171 // Leader will send an immediate heartbeat - ignore it.
172 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
174 // The follower would normally reply - simulate that explicitly here.
175 long lastIndex = actorContext.getReplicatedLog().lastIndex();
176 leader.handleMessage(followerActor, new AppendEntriesReply(
177 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
178 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
180 followerActor.underlyingActor().clear();
182 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
184 // State should not change
185 assertTrue(raftBehavior instanceof Leader);
187 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
189 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
190 assertEquals("Entries size", 1, appendEntries.getEntries().size());
191 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
192 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
193 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
194 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
198 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
199 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
201 MockRaftActorContext actorContext = createActorContextWithFollower();
203 // The raft context is initialized with a couple log entries. However the commitIndex
204 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
205 // committed and applied. Now it regains leadership with a higher term (2).
206 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
207 long newTerm = prevTerm + 1;
208 actorContext.getTermInformation().update(newTerm, "");
210 leader = new Leader(actorContext);
211 actorContext.setCurrentBehavior(leader);
213 // Leader will send an immediate heartbeat - ignore it.
214 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
216 // The follower replies with the leader's current last index and term, simulating that it is
217 // up to date with the leader.
218 long lastIndex = actorContext.getReplicatedLog().lastIndex();
219 leader.handleMessage(followerActor, new AppendEntriesReply(
220 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
222 // The commit index should not get updated even though consensus was reached. This is b/c the
223 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
224 // from previous terms by counting replicas".
225 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
227 followerActor.underlyingActor().clear();
229 // Now replicate a new entry with the new term 2.
230 long newIndex = lastIndex + 1;
231 sendReplicate(actorContext, newTerm, newIndex);
233 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
235 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
236 assertEquals("Entries size", 1, appendEntries.getEntries().size());
237 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
238 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
239 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
241 // The follower replies with success. The leader should now update the commit index to the new index
242 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
243 // prior entries are committed indirectly".
244 leader.handleMessage(followerActor, new AppendEntriesReply(
245 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
247 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
251 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
252 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
254 MockRaftActorContext actorContext = createActorContextWithFollower();
255 actorContext.setRaftPolicy(createRaftPolicy(true, true));
258 actorContext.getTermInformation().update(term, "");
260 leader = new Leader(actorContext);
262 // Leader will send an immediate heartbeat - ignore it.
263 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
265 // The follower would normally reply - simulate that explicitly here.
266 long lastIndex = actorContext.getReplicatedLog().lastIndex();
267 leader.handleMessage(followerActor, new AppendEntriesReply(
268 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
269 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
271 followerActor.underlyingActor().clear();
273 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
275 // State should not change
276 assertTrue(raftBehavior instanceof Leader);
278 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
279 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
280 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
281 assertEquals("Entries size", 1, appendEntries.getEntries().size());
282 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
283 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
284 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
285 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
289 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
290 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
292 MockRaftActorContext actorContext = createActorContextWithFollower();
293 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
295 public FiniteDuration getHeartBeatInterval() {
296 return FiniteDuration.apply(5, TimeUnit.SECONDS);
301 actorContext.getTermInformation().update(term, "");
303 leader = new Leader(actorContext);
305 // Leader will send an immediate heartbeat - ignore it.
306 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
308 // The follower would normally reply - simulate that explicitly here.
309 long lastIndex = actorContext.getReplicatedLog().lastIndex();
310 leader.handleMessage(followerActor, new AppendEntriesReply(
311 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
312 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
314 followerActor.underlyingActor().clear();
316 for(int i=0;i<5;i++) {
317 sendReplicate(actorContext, lastIndex+i+1);
320 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
321 // We expect only 1 message to be sent because of two reasons,
322 // - an append entries reply was not received
323 // - the heartbeat interval has not expired
324 // In this scenario if multiple messages are sent they would likely be duplicates
325 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
329 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
330 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
332 MockRaftActorContext actorContext = createActorContextWithFollower();
333 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
335 public FiniteDuration getHeartBeatInterval() {
336 return FiniteDuration.apply(5, TimeUnit.SECONDS);
341 actorContext.getTermInformation().update(term, "");
343 leader = new Leader(actorContext);
345 // Leader will send an immediate heartbeat - ignore it.
346 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
348 // The follower would normally reply - simulate that explicitly here.
349 long lastIndex = actorContext.getReplicatedLog().lastIndex();
350 leader.handleMessage(followerActor, new AppendEntriesReply(
351 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
352 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
354 followerActor.underlyingActor().clear();
356 for(int i=0;i<3;i++) {
357 sendReplicate(actorContext, lastIndex+i+1);
358 leader.handleMessage(followerActor, new AppendEntriesReply(
359 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
363 for(int i=3;i<5;i++) {
364 sendReplicate(actorContext, lastIndex + i + 1);
367 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
368 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
369 // get sent to the follower - but not the 5th
370 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
372 for(int i=0;i<4;i++) {
373 long expected = allMessages.get(i).getEntries().get(0).getIndex();
374 assertEquals(expected, i+2);
379 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
380 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
382 MockRaftActorContext actorContext = createActorContextWithFollower();
383 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
385 public FiniteDuration getHeartBeatInterval() {
386 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
391 actorContext.getTermInformation().update(term, "");
393 leader = new Leader(actorContext);
395 // Leader will send an immediate heartbeat - ignore it.
396 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
398 // The follower would normally reply - simulate that explicitly here.
399 long lastIndex = actorContext.getReplicatedLog().lastIndex();
400 leader.handleMessage(followerActor, new AppendEntriesReply(
401 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
402 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
404 followerActor.underlyingActor().clear();
406 sendReplicate(actorContext, lastIndex+1);
408 // Wait slightly longer than heartbeat duration
409 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
411 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
413 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
414 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
416 assertEquals(1, allMessages.get(0).getEntries().size());
417 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
418 assertEquals(1, allMessages.get(1).getEntries().size());
419 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
424 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
425 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
427 MockRaftActorContext actorContext = createActorContextWithFollower();
428 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
430 public FiniteDuration getHeartBeatInterval() {
431 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
436 actorContext.getTermInformation().update(term, "");
438 leader = new Leader(actorContext);
440 // Leader will send an immediate heartbeat - ignore it.
441 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
443 // The follower would normally reply - simulate that explicitly here.
444 long lastIndex = actorContext.getReplicatedLog().lastIndex();
445 leader.handleMessage(followerActor, new AppendEntriesReply(
446 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
447 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
449 followerActor.underlyingActor().clear();
451 for(int i=0;i<3;i++) {
452 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
453 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
456 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
457 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
461 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
462 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
464 MockRaftActorContext actorContext = createActorContextWithFollower();
465 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
467 public FiniteDuration getHeartBeatInterval() {
468 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473 actorContext.getTermInformation().update(term, "");
475 leader = new Leader(actorContext);
477 // Leader will send an immediate heartbeat - ignore it.
478 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480 // The follower would normally reply - simulate that explicitly here.
481 long lastIndex = actorContext.getReplicatedLog().lastIndex();
482 leader.handleMessage(followerActor, new AppendEntriesReply(
483 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
484 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
486 followerActor.underlyingActor().clear();
488 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
489 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
490 sendReplicate(actorContext, lastIndex+1);
492 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
493 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
495 assertEquals(0, allMessages.get(0).getEntries().size());
496 assertEquals(1, allMessages.get(1).getEntries().size());
501 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
502 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
504 MockRaftActorContext actorContext = createActorContext();
506 leader = new Leader(actorContext);
508 actorContext.setLastApplied(0);
510 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
511 long term = actorContext.getTermInformation().getCurrentTerm();
512 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
513 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
515 actorContext.getReplicatedLog().append(newEntry);
517 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
518 new Replicate(leaderActor, "state-id", newEntry));
520 // State should not change
521 assertTrue(raftBehavior instanceof Leader);
523 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
525 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
526 // one since lastApplied state is 0.
527 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
528 leaderActor, ApplyState.class);
529 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
531 for(int i = 0; i <= newLogIndex - 1; i++ ) {
532 ApplyState applyState = applyStateList.get(i);
533 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
534 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
537 ApplyState last = applyStateList.get((int) newLogIndex - 1);
538 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
539 assertEquals("getIdentifier", "state-id", last.getIdentifier());
543 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
544 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
546 MockRaftActorContext actorContext = createActorContextWithFollower();
548 Map<String, String> leadersSnapshot = new HashMap<>();
549 leadersSnapshot.put("1", "A");
550 leadersSnapshot.put("2", "B");
551 leadersSnapshot.put("3", "C");
554 actorContext.getReplicatedLog().removeFrom(0);
556 final int commitIndex = 3;
557 final int snapshotIndex = 2;
558 final int newEntryIndex = 4;
559 final int snapshotTerm = 1;
560 final int currentTerm = 2;
562 // set the snapshot variables in replicatedlog
563 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565 actorContext.setCommitIndex(commitIndex);
566 //set follower timeout to 2 mins, helps during debugging
567 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
569 leader = new Leader(actorContext);
571 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
572 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
575 ReplicatedLogImplEntry entry =
576 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577 new MockRaftActorContext.MockPayload("D"));
579 //update follower timestamp
580 leader.markFollowerActive(FOLLOWER_ID);
582 ByteString bs = toByteString(leadersSnapshot);
583 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
584 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
585 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
586 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
588 //send first chunk and no InstallSnapshotReply received yet
590 fts.incrementChunkIndex();
592 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
593 TimeUnit.MILLISECONDS);
595 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
597 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
599 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
601 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
603 //InstallSnapshotReply received
604 fts.markSendStatus(true);
606 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
608 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
610 assertEquals(commitIndex, is.getLastIncludedIndex());
614 public void testSendAppendEntriesSnapshotScenario() throws Exception {
615 logStart("testSendAppendEntriesSnapshotScenario");
617 MockRaftActorContext actorContext = createActorContextWithFollower();
619 Map<String, String> leadersSnapshot = new HashMap<>();
620 leadersSnapshot.put("1", "A");
621 leadersSnapshot.put("2", "B");
622 leadersSnapshot.put("3", "C");
625 actorContext.getReplicatedLog().removeFrom(0);
627 final int followersLastIndex = 2;
628 final int snapshotIndex = 3;
629 final int newEntryIndex = 4;
630 final int snapshotTerm = 1;
631 final int currentTerm = 2;
633 // set the snapshot variables in replicatedlog
634 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
635 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
636 actorContext.setCommitIndex(followersLastIndex);
638 leader = new Leader(actorContext);
640 // Leader will send an immediate heartbeat - ignore it.
641 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
644 ReplicatedLogImplEntry entry =
645 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
646 new MockRaftActorContext.MockPayload("D"));
648 actorContext.getReplicatedLog().append(entry);
650 //update follower timestamp
651 leader.markFollowerActive(FOLLOWER_ID);
653 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
654 RaftActorBehavior raftBehavior = leader.handleMessage(
655 leaderActor, new Replicate(null, "state-id", entry));
657 assertTrue(raftBehavior instanceof Leader);
659 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
663 public void testInitiateInstallSnapshot() throws Exception {
664 logStart("testInitiateInstallSnapshot");
666 MockRaftActorContext actorContext = createActorContextWithFollower();
669 actorContext.getReplicatedLog().removeFrom(0);
671 final int followersLastIndex = 2;
672 final int snapshotIndex = 3;
673 final int newEntryIndex = 4;
674 final int snapshotTerm = 1;
675 final int currentTerm = 2;
677 // set the snapshot variables in replicatedlog
678 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680 actorContext.setLastApplied(3);
681 actorContext.setCommitIndex(followersLastIndex);
683 leader = new Leader(actorContext);
685 // Leader will send an immediate heartbeat - ignore it.
686 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688 // set the snapshot as absent and check if capture-snapshot is invoked.
689 leader.setSnapshot(null);
692 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
693 new MockRaftActorContext.MockPayload("D"));
695 actorContext.getReplicatedLog().append(entry);
697 //update follower timestamp
698 leader.markFollowerActive(FOLLOWER_ID);
700 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
702 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
704 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
706 assertTrue(cs.isInstallSnapshotInitiated());
707 assertEquals(3, cs.getLastAppliedIndex());
708 assertEquals(1, cs.getLastAppliedTerm());
709 assertEquals(4, cs.getLastIndex());
710 assertEquals(2, cs.getLastTerm());
712 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
715 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
719 public void testInitiateForceInstallSnapshot() throws Exception {
720 logStart("testInitiateForceInstallSnapshot");
722 MockRaftActorContext actorContext = createActorContextWithFollower();
724 final int followersLastIndex = 2;
725 final int snapshotIndex = -1;
726 final int newEntryIndex = 4;
727 final int snapshotTerm = -1;
728 final int currentTerm = 2;
730 // set the snapshot variables in replicatedlog
731 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733 actorContext.setLastApplied(3);
734 actorContext.setCommitIndex(followersLastIndex);
736 actorContext.getReplicatedLog().removeFrom(0);
738 leader = new Leader(actorContext);
740 // Leader will send an immediate heartbeat - ignore it.
741 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
743 // set the snapshot as absent and check if capture-snapshot is invoked.
744 leader.setSnapshot(null);
746 for(int i=0;i<4;i++) {
747 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
748 new MockRaftActorContext.MockPayload("X" + i)));
752 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
753 new MockRaftActorContext.MockPayload("D"));
755 actorContext.getReplicatedLog().append(entry);
757 //update follower timestamp
758 leader.markFollowerActive(FOLLOWER_ID);
760 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
761 // installed with a SendInstallSnapshot
762 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
764 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
766 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
768 assertTrue(cs.isInstallSnapshotInitiated());
769 assertEquals(3, cs.getLastAppliedIndex());
770 assertEquals(1, cs.getLastAppliedTerm());
771 assertEquals(4, cs.getLastIndex());
772 assertEquals(2, cs.getLastTerm());
774 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
775 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
777 assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
782 public void testInstallSnapshot() throws Exception {
783 logStart("testInstallSnapshot");
785 MockRaftActorContext actorContext = createActorContextWithFollower();
787 Map<String, String> leadersSnapshot = new HashMap<>();
788 leadersSnapshot.put("1", "A");
789 leadersSnapshot.put("2", "B");
790 leadersSnapshot.put("3", "C");
793 actorContext.getReplicatedLog().removeFrom(0);
795 final int lastAppliedIndex = 3;
796 final int snapshotIndex = 2;
797 final int snapshotTerm = 1;
798 final int currentTerm = 2;
800 // set the snapshot variables in replicatedlog
801 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804 actorContext.setCommitIndex(lastAppliedIndex);
805 actorContext.setLastApplied(lastAppliedIndex);
807 leader = new Leader(actorContext);
809 // Initial heartbeat.
810 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
816 Collections.<ReplicatedLogEntry>emptyList(),
817 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
819 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
821 assertTrue(raftBehavior instanceof Leader);
823 // check if installsnapshot gets called with the correct values.
825 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
827 assertNotNull(installSnapshot.getData());
828 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
829 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
831 assertEquals(currentTerm, installSnapshot.getTerm());
835 public void testForceInstallSnapshot() throws Exception {
836 logStart("testForceInstallSnapshot");
838 MockRaftActorContext actorContext = createActorContextWithFollower();
840 Map<String, String> leadersSnapshot = new HashMap<>();
841 leadersSnapshot.put("1", "A");
842 leadersSnapshot.put("2", "B");
843 leadersSnapshot.put("3", "C");
845 final int lastAppliedIndex = 3;
846 final int snapshotIndex = -1;
847 final int snapshotTerm = -1;
848 final int currentTerm = 2;
850 // set the snapshot variables in replicatedlog
851 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
852 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
853 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
854 actorContext.setCommitIndex(lastAppliedIndex);
855 actorContext.setLastApplied(lastAppliedIndex);
857 leader = new Leader(actorContext);
859 // Initial heartbeat.
860 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
862 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
863 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
865 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
866 Collections.<ReplicatedLogEntry>emptyList(),
867 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
869 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
871 assertTrue(raftBehavior instanceof Leader);
873 // check if installsnapshot gets called with the correct values.
875 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877 assertNotNull(installSnapshot.getData());
878 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
881 assertEquals(currentTerm, installSnapshot.getTerm());
885 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886 logStart("testHandleInstallSnapshotReplyLastChunk");
888 MockRaftActorContext actorContext = createActorContextWithFollower();
890 final int commitIndex = 3;
891 final int snapshotIndex = 2;
892 final int snapshotTerm = 1;
893 final int currentTerm = 2;
895 actorContext.setCommitIndex(commitIndex);
897 leader = new Leader(actorContext);
898 actorContext.setCurrentBehavior(leader);
900 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
901 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
903 // Ignore initial heartbeat.
904 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
906 Map<String, String> leadersSnapshot = new HashMap<>();
907 leadersSnapshot.put("1", "A");
908 leadersSnapshot.put("2", "B");
909 leadersSnapshot.put("3", "C");
911 // set the snapshot variables in replicatedlog
913 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917 ByteString bs = toByteString(leadersSnapshot);
918 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
920 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
921 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
922 while(!fts.isLastChunk(fts.getChunkIndex())) {
924 fts.incrementChunkIndex();
928 actorContext.getReplicatedLog().removeFrom(0);
930 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
931 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
933 assertTrue(raftBehavior instanceof Leader);
935 assertEquals(0, leader.followerSnapshotSize());
936 assertEquals(1, leader.followerLogSize());
937 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
939 assertEquals(commitIndex, fli.getMatchIndex());
940 assertEquals(commitIndex + 1, fli.getNextIndex());
944 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
945 logStart("testSendSnapshotfromInstallSnapshotReply");
947 MockRaftActorContext actorContext = createActorContextWithFollower();
949 final int commitIndex = 3;
950 final int snapshotIndex = 2;
951 final int snapshotTerm = 1;
952 final int currentTerm = 2;
954 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
956 public int getSnapshotChunkSize() {
960 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
961 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
963 actorContext.setConfigParams(configParams);
964 actorContext.setCommitIndex(commitIndex);
966 leader = new Leader(actorContext);
967 actorContext.setCurrentBehavior(leader);
969 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
970 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
972 Map<String, String> leadersSnapshot = new HashMap<>();
973 leadersSnapshot.put("1", "A");
974 leadersSnapshot.put("2", "B");
975 leadersSnapshot.put("3", "C");
977 // set the snapshot variables in replicatedlog
978 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
979 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
980 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
982 ByteString bs = toByteString(leadersSnapshot);
983 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
984 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
985 leader.setSnapshot(snapshot);
987 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
989 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
991 assertEquals(1, installSnapshot.getChunkIndex());
992 assertEquals(3, installSnapshot.getTotalChunks());
994 followerActor.underlyingActor().clear();
995 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
996 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
998 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1000 assertEquals(2, installSnapshot.getChunkIndex());
1001 assertEquals(3, installSnapshot.getTotalChunks());
1003 followerActor.underlyingActor().clear();
1004 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1007 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1009 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1010 followerActor.underlyingActor().clear();
1011 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1012 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1014 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1016 assertNull(installSnapshot);
1021 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1022 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1024 MockRaftActorContext actorContext = createActorContextWithFollower();
1026 final int commitIndex = 3;
1027 final int snapshotIndex = 2;
1028 final int snapshotTerm = 1;
1029 final int currentTerm = 2;
1031 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1033 public int getSnapshotChunkSize() {
1038 actorContext.setCommitIndex(commitIndex);
1040 leader = new Leader(actorContext);
1042 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1043 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1045 Map<String, String> leadersSnapshot = new HashMap<>();
1046 leadersSnapshot.put("1", "A");
1047 leadersSnapshot.put("2", "B");
1048 leadersSnapshot.put("3", "C");
1050 // set the snapshot variables in replicatedlog
1051 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1052 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1053 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1055 ByteString bs = toByteString(leadersSnapshot);
1056 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1057 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1058 leader.setSnapshot(snapshot);
1060 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1061 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1063 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1065 assertEquals(1, installSnapshot.getChunkIndex());
1066 assertEquals(3, installSnapshot.getTotalChunks());
1068 followerActor.underlyingActor().clear();
1070 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1071 FOLLOWER_ID, -1, false));
1073 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1074 TimeUnit.MILLISECONDS);
1076 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1078 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1080 assertEquals(1, installSnapshot.getChunkIndex());
1081 assertEquals(3, installSnapshot.getTotalChunks());
1085 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1086 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1088 MockRaftActorContext actorContext = createActorContextWithFollower();
1090 final int commitIndex = 3;
1091 final int snapshotIndex = 2;
1092 final int snapshotTerm = 1;
1093 final int currentTerm = 2;
1095 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1097 public int getSnapshotChunkSize() {
1102 actorContext.setCommitIndex(commitIndex);
1104 leader = new Leader(actorContext);
1106 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1107 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1109 Map<String, String> leadersSnapshot = new HashMap<>();
1110 leadersSnapshot.put("1", "A");
1111 leadersSnapshot.put("2", "B");
1112 leadersSnapshot.put("3", "C");
1114 // set the snapshot variables in replicatedlog
1115 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1116 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1117 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1119 ByteString bs = toByteString(leadersSnapshot);
1120 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1121 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1122 leader.setSnapshot(snapshot);
1124 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1126 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1128 assertEquals(1, installSnapshot.getChunkIndex());
1129 assertEquals(3, installSnapshot.getTotalChunks());
1130 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1132 int hashCode = Arrays.hashCode(installSnapshot.getData());
1134 followerActor.underlyingActor().clear();
1136 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1137 FOLLOWER_ID, 1, true));
1139 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1141 assertEquals(2, installSnapshot.getChunkIndex());
1142 assertEquals(3, installSnapshot.getTotalChunks());
1143 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1147 public void testFollowerToSnapshotLogic() {
1148 logStart("testFollowerToSnapshotLogic");
1150 MockRaftActorContext actorContext = createActorContext();
1152 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1154 public int getSnapshotChunkSize() {
1159 leader = new Leader(actorContext);
1161 Map<String, String> leadersSnapshot = new HashMap<>();
1162 leadersSnapshot.put("1", "A");
1163 leadersSnapshot.put("2", "B");
1164 leadersSnapshot.put("3", "C");
1166 ByteString bs = toByteString(leadersSnapshot);
1167 byte[] barray = bs.toByteArray();
1169 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1170 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1172 assertEquals(bs.size(), barray.length);
1175 for (int i=0; i < barray.length; i = i + 50) {
1179 if (i + 50 > barray.length) {
1183 byte[] chunk = fts.getNextChunk();
1184 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1185 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1187 fts.markSendStatus(true);
1188 if (!fts.isLastChunk(chunkIndex)) {
1189 fts.incrementChunkIndex();
1193 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1197 protected Leader createBehavior(final RaftActorContext actorContext) {
1198 return new Leader(actorContext);
1202 protected MockRaftActorContext createActorContext() {
1203 return createActorContext(leaderActor);
1207 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1208 return createActorContext(LEADER_ID, actorRef);
1211 private MockRaftActorContext createActorContextWithFollower() {
1212 MockRaftActorContext actorContext = createActorContext();
1213 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1214 followerActor.path().toString()).build());
1215 return actorContext;
1218 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1219 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1220 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1221 configParams.setElectionTimeoutFactor(100000);
1222 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1223 context.setConfigParams(configParams);
1224 context.setPayloadVersion(payloadVersion);
1228 private MockRaftActorContext createFollowerActorContextWithLeader() {
1229 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1230 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1231 followerConfig.setElectionTimeoutFactor(10000);
1232 followerActorContext.setConfigParams(followerConfig);
1233 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1234 return followerActorContext;
1238 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1239 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1241 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1243 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1245 Follower follower = new Follower(followerActorContext);
1246 followerActor.underlyingActor().setBehavior(follower);
1247 followerActorContext.setCurrentBehavior(follower);
1249 Map<String, String> peerAddresses = new HashMap<>();
1250 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1252 leaderActorContext.setPeerAddresses(peerAddresses);
1254 leaderActorContext.getReplicatedLog().removeFrom(0);
1257 leaderActorContext.setReplicatedLog(
1258 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1260 leaderActorContext.setCommitIndex(1);
1262 followerActorContext.getReplicatedLog().removeFrom(0);
1264 // follower too has the exact same log entries and has the same commit index
1265 followerActorContext.setReplicatedLog(
1266 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1268 followerActorContext.setCommitIndex(1);
1270 leader = new Leader(leaderActorContext);
1271 leaderActorContext.setCurrentBehavior(leader);
1273 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1275 assertEquals(1, appendEntries.getLeaderCommit());
1276 assertEquals(0, appendEntries.getEntries().size());
1277 assertEquals(0, appendEntries.getPrevLogIndex());
1279 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1280 leaderActor, AppendEntriesReply.class);
1282 assertEquals(2, appendEntriesReply.getLogLastIndex());
1283 assertEquals(1, appendEntriesReply.getLogLastTerm());
1285 // follower returns its next index
1286 assertEquals(2, appendEntriesReply.getLogLastIndex());
1287 assertEquals(1, appendEntriesReply.getLogLastTerm());
1293 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1294 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1296 MockRaftActorContext leaderActorContext = createActorContext();
1298 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1299 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1301 Follower follower = new Follower(followerActorContext);
1302 followerActor.underlyingActor().setBehavior(follower);
1303 followerActorContext.setCurrentBehavior(follower);
1305 Map<String, String> leaderPeerAddresses = new HashMap<>();
1306 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1308 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1310 leaderActorContext.getReplicatedLog().removeFrom(0);
1312 leaderActorContext.setReplicatedLog(
1313 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315 leaderActorContext.setCommitIndex(1);
1317 followerActorContext.getReplicatedLog().removeFrom(0);
1319 followerActorContext.setReplicatedLog(
1320 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322 // follower has the same log entries but its commit index > leaders commit index
1323 followerActorContext.setCommitIndex(2);
1325 leader = new Leader(leaderActorContext);
1327 // Initial heartbeat
1328 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1330 assertEquals(1, appendEntries.getLeaderCommit());
1331 assertEquals(0, appendEntries.getEntries().size());
1332 assertEquals(0, appendEntries.getPrevLogIndex());
1334 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1335 leaderActor, AppendEntriesReply.class);
1337 assertEquals(2, appendEntriesReply.getLogLastIndex());
1338 assertEquals(1, appendEntriesReply.getLogLastTerm());
1340 leaderActor.underlyingActor().setBehavior(follower);
1341 leader.handleMessage(followerActor, appendEntriesReply);
1343 leaderActor.underlyingActor().clear();
1344 followerActor.underlyingActor().clear();
1346 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1347 TimeUnit.MILLISECONDS);
1349 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1351 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1353 assertEquals(2, appendEntries.getLeaderCommit());
1354 assertEquals(0, appendEntries.getEntries().size());
1355 assertEquals(2, appendEntries.getPrevLogIndex());
1357 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1359 assertEquals(2, appendEntriesReply.getLogLastIndex());
1360 assertEquals(1, appendEntriesReply.getLogLastTerm());
1362 assertEquals(2, followerActorContext.getCommitIndex());
1368 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1369 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1371 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1372 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1373 new FiniteDuration(1000, TimeUnit.SECONDS));
1375 leaderActorContext.setReplicatedLog(
1376 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1377 long leaderCommitIndex = 2;
1378 leaderActorContext.setCommitIndex(leaderCommitIndex);
1379 leaderActorContext.setLastApplied(leaderCommitIndex);
1381 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1382 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1384 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1386 followerActorContext.setReplicatedLog(
1387 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1388 followerActorContext.setCommitIndex(0);
1389 followerActorContext.setLastApplied(0);
1391 Follower follower = new Follower(followerActorContext);
1392 followerActor.underlyingActor().setBehavior(follower);
1394 leader = new Leader(leaderActorContext);
1396 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1397 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1399 MessageCollectorActor.clearMessages(followerActor);
1400 MessageCollectorActor.clearMessages(leaderActor);
1402 // Verify initial AppendEntries sent with the leader's current commit index.
1403 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1404 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1405 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1407 leaderActor.underlyingActor().setBehavior(leader);
1409 leader.handleMessage(followerActor, appendEntriesReply);
1411 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1412 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1414 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1415 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1416 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1418 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1419 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1420 appendEntries.getEntries().get(0).getData());
1421 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1422 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1423 appendEntries.getEntries().get(1).getData());
1425 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1426 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1428 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1430 ApplyState applyState = applyStateList.get(0);
1431 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1432 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1433 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1434 applyState.getReplicatedLogEntry().getData());
1436 applyState = applyStateList.get(1);
1437 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1438 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1439 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1440 applyState.getReplicatedLogEntry().getData());
1442 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1443 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1447 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1448 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1450 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1451 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1452 new FiniteDuration(1000, TimeUnit.SECONDS));
1454 leaderActorContext.setReplicatedLog(
1455 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1456 long leaderCommitIndex = 1;
1457 leaderActorContext.setCommitIndex(leaderCommitIndex);
1458 leaderActorContext.setLastApplied(leaderCommitIndex);
1460 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1461 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1463 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1465 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1466 followerActorContext.setCommitIndex(-1);
1467 followerActorContext.setLastApplied(-1);
1469 Follower follower = new Follower(followerActorContext);
1470 followerActor.underlyingActor().setBehavior(follower);
1471 followerActorContext.setCurrentBehavior(follower);
1473 leader = new Leader(leaderActorContext);
1475 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1476 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1478 MessageCollectorActor.clearMessages(followerActor);
1479 MessageCollectorActor.clearMessages(leaderActor);
1481 // Verify initial AppendEntries sent with the leader's current commit index.
1482 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1483 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1484 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1486 leaderActor.underlyingActor().setBehavior(leader);
1487 leaderActorContext.setCurrentBehavior(leader);
1489 leader.handleMessage(followerActor, appendEntriesReply);
1491 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1492 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1494 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1495 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1496 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1498 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1499 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1500 appendEntries.getEntries().get(0).getData());
1501 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1502 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1503 appendEntries.getEntries().get(1).getData());
1505 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1506 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1508 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1510 ApplyState applyState = applyStateList.get(0);
1511 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1512 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1513 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1514 applyState.getReplicatedLogEntry().getData());
1516 applyState = applyStateList.get(1);
1517 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1518 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1519 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1520 applyState.getReplicatedLogEntry().getData());
1522 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1523 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1527 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1528 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1530 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1531 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1532 new FiniteDuration(1000, TimeUnit.SECONDS));
1534 leaderActorContext.setReplicatedLog(
1535 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1536 long leaderCommitIndex = 1;
1537 leaderActorContext.setCommitIndex(leaderCommitIndex);
1538 leaderActorContext.setLastApplied(leaderCommitIndex);
1540 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1541 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1543 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1545 followerActorContext.setReplicatedLog(
1546 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1547 followerActorContext.setCommitIndex(-1);
1548 followerActorContext.setLastApplied(-1);
1550 Follower follower = new Follower(followerActorContext);
1551 followerActor.underlyingActor().setBehavior(follower);
1552 followerActorContext.setCurrentBehavior(follower);
1554 leader = new Leader(leaderActorContext);
1556 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1557 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1559 MessageCollectorActor.clearMessages(followerActor);
1560 MessageCollectorActor.clearMessages(leaderActor);
1562 // Verify initial AppendEntries sent with the leader's current commit index.
1563 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1564 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1565 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1567 leaderActor.underlyingActor().setBehavior(leader);
1568 leaderActorContext.setCurrentBehavior(leader);
1570 leader.handleMessage(followerActor, appendEntriesReply);
1572 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1573 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1575 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1576 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1577 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1579 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1580 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1581 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1582 appendEntries.getEntries().get(0).getData());
1583 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1584 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1585 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1586 appendEntries.getEntries().get(1).getData());
1588 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1589 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1591 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1593 ApplyState applyState = applyStateList.get(0);
1594 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1595 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1596 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1597 applyState.getReplicatedLogEntry().getData());
1599 applyState = applyStateList.get(1);
1600 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1601 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1602 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1603 applyState.getReplicatedLogEntry().getData());
1605 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1606 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1607 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1611 public void testHandleAppendEntriesReplyWithNewerTerm(){
1612 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1614 MockRaftActorContext leaderActorContext = createActorContext();
1615 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1616 new FiniteDuration(10000, TimeUnit.SECONDS));
1618 leaderActorContext.setReplicatedLog(
1619 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1621 leader = new Leader(leaderActorContext);
1622 leaderActor.underlyingActor().setBehavior(leader);
1623 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1625 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1627 assertEquals(false, appendEntriesReply.isSuccess());
1628 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1630 MessageCollectorActor.clearMessages(leaderActor);
1634 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1635 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1637 MockRaftActorContext leaderActorContext = createActorContext();
1638 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1639 new FiniteDuration(10000, TimeUnit.SECONDS));
1641 leaderActorContext.setReplicatedLog(
1642 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1643 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1645 leader = new Leader(leaderActorContext);
1646 leaderActor.underlyingActor().setBehavior(leader);
1647 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1649 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1651 assertEquals(false, appendEntriesReply.isSuccess());
1652 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1654 MessageCollectorActor.clearMessages(leaderActor);
1658 public void testHandleAppendEntriesReplySuccess() throws Exception {
1659 logStart("testHandleAppendEntriesReplySuccess");
1661 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1663 leaderActorContext.setReplicatedLog(
1664 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1666 leaderActorContext.setCommitIndex(1);
1667 leaderActorContext.setLastApplied(1);
1668 leaderActorContext.getTermInformation().update(1, "leader");
1670 leader = new Leader(leaderActorContext);
1672 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1674 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1675 assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1677 short payloadVersion = 5;
1678 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1680 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1682 assertEquals(RaftState.Leader, raftActorBehavior.state());
1684 assertEquals(2, leaderActorContext.getCommitIndex());
1686 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1687 leaderActor, ApplyJournalEntries.class);
1689 assertEquals(2, leaderActorContext.getLastApplied());
1691 assertEquals(2, applyJournalEntries.getToIndex());
1693 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1696 assertEquals(1,applyStateList.size());
1698 ApplyState applyState = applyStateList.get(0);
1700 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1702 assertEquals(2, followerInfo.getMatchIndex());
1703 assertEquals(3, followerInfo.getNextIndex());
1704 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1705 assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1709 public void testHandleAppendEntriesReplyUnknownFollower(){
1710 logStart("testHandleAppendEntriesReplyUnknownFollower");
1712 MockRaftActorContext leaderActorContext = createActorContext();
1714 leader = new Leader(leaderActorContext);
1716 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1718 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1720 assertEquals(RaftState.Leader, raftActorBehavior.state());
1724 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1725 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1727 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1728 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1729 new FiniteDuration(1000, TimeUnit.SECONDS));
1730 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1732 leaderActorContext.setReplicatedLog(
1733 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1734 long leaderCommitIndex = 3;
1735 leaderActorContext.setCommitIndex(leaderCommitIndex);
1736 leaderActorContext.setLastApplied(leaderCommitIndex);
1738 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1739 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1740 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1741 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1743 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1745 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1746 followerActorContext.setCommitIndex(-1);
1747 followerActorContext.setLastApplied(-1);
1749 Follower follower = new Follower(followerActorContext);
1750 followerActor.underlyingActor().setBehavior(follower);
1751 followerActorContext.setCurrentBehavior(follower);
1753 leader = new Leader(leaderActorContext);
1755 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1756 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1758 MessageCollectorActor.clearMessages(followerActor);
1759 MessageCollectorActor.clearMessages(leaderActor);
1761 // Verify initial AppendEntries sent with the leader's current commit index.
1762 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1763 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1764 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1766 leaderActor.underlyingActor().setBehavior(leader);
1767 leaderActorContext.setCurrentBehavior(leader);
1769 leader.handleMessage(followerActor, appendEntriesReply);
1771 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1772 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1774 appendEntries = appendEntriesList.get(0);
1775 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1776 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1777 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1779 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1780 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1781 appendEntries.getEntries().get(0).getData());
1782 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1783 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1784 appendEntries.getEntries().get(1).getData());
1786 appendEntries = appendEntriesList.get(1);
1787 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1788 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1789 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1791 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1792 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1793 appendEntries.getEntries().get(0).getData());
1794 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1795 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1796 appendEntries.getEntries().get(1).getData());
1798 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1799 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1801 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1803 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1804 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1808 public void testHandleRequestVoteReply(){
1809 logStart("testHandleRequestVoteReply");
1811 MockRaftActorContext leaderActorContext = createActorContext();
1813 leader = new Leader(leaderActorContext);
1815 // Should be a no-op.
1816 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1817 new RequestVoteReply(1, true));
1819 assertEquals(RaftState.Leader, raftActorBehavior.state());
1821 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1823 assertEquals(RaftState.Leader, raftActorBehavior.state());
1827 public void testIsolatedLeaderCheckNoFollowers() {
1828 logStart("testIsolatedLeaderCheckNoFollowers");
1830 MockRaftActorContext leaderActorContext = createActorContext();
1832 leader = new Leader(leaderActorContext);
1833 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1834 assertTrue(behavior instanceof Leader);
1837 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1838 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1839 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1841 MockRaftActorContext leaderActorContext = createActorContext();
1843 Map<String, String> peerAddresses = new HashMap<>();
1844 peerAddresses.put("follower-1", followerActor1.path().toString());
1845 peerAddresses.put("follower-2", followerActor2.path().toString());
1847 leaderActorContext.setPeerAddresses(peerAddresses);
1848 leaderActorContext.setRaftPolicy(raftPolicy);
1850 leader = new Leader(leaderActorContext);
1852 leader.markFollowerActive("follower-1");
1853 leader.markFollowerActive("follower-2");
1854 RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855 assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1857 // kill 1 follower and verify if that got killed
1858 final JavaTestKit probe = new JavaTestKit(getSystem());
1859 probe.watch(followerActor1);
1860 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1861 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1862 assertEquals(termMsg1.getActor(), followerActor1);
1864 leader.markFollowerInActive("follower-1");
1865 leader.markFollowerActive("follower-2");
1866 behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1867 assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1869 // kill 2nd follower and leader should change to Isolated leader
1870 followerActor2.tell(PoisonPill.getInstance(), null);
1871 probe.watch(followerActor2);
1872 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1873 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1874 assertEquals(termMsg2.getActor(), followerActor2);
1876 leader.markFollowerInActive("follower-2");
1877 return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1881 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1882 logStart("testIsolatedLeaderCheckTwoFollowers");
1884 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1886 assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1887 behavior instanceof IsolatedLeader);
1891 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1892 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1894 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1896 assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1897 behavior instanceof Leader);
1901 public void testLaggingFollowerStarvation() throws Exception {
1902 logStart("testLaggingFollowerStarvation");
1903 new JavaTestKit(getSystem()) {{
1904 String leaderActorId = actorFactory.generateActorId("leader");
1905 String follower1ActorId = actorFactory.generateActorId("follower");
1906 String follower2ActorId = actorFactory.generateActorId("follower");
1908 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1909 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1910 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1911 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1913 MockRaftActorContext leaderActorContext =
1914 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1916 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1917 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1918 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1920 leaderActorContext.setConfigParams(configParams);
1922 leaderActorContext.setReplicatedLog(
1923 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1925 Map<String, String> peerAddresses = new HashMap<>();
1926 peerAddresses.put(follower1ActorId,
1927 follower1Actor.path().toString());
1928 peerAddresses.put(follower2ActorId,
1929 follower2Actor.path().toString());
1931 leaderActorContext.setPeerAddresses(peerAddresses);
1932 leaderActorContext.getTermInformation().update(1, leaderActorId);
1934 RaftActorBehavior leader = createBehavior(leaderActorContext);
1936 leaderActor.underlyingActor().setBehavior(leader);
1938 for(int i=1;i<6;i++) {
1939 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1940 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1941 assertTrue(newBehavior == leader);
1942 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1945 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1946 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1948 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1949 heartbeats.size() > 1);
1951 // Check if follower-2 got AppendEntries during this time and was not starved
1952 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1954 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1955 appendEntries.size() > 1);
1961 public void testReplicationConsensusWithNonVotingFollower() {
1962 logStart("testReplicationConsensusWithNonVotingFollower");
1964 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1965 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1966 new FiniteDuration(1000, TimeUnit.SECONDS));
1968 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1970 String nonVotingFollowerId = "nonvoting-follower";
1971 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1972 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1974 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1976 leader = new Leader(leaderActorContext);
1977 leaderActorContext.setCurrentBehavior(leader);
1979 // Ignore initial heartbeats
1980 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1983 MessageCollectorActor.clearMessages(followerActor);
1984 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1985 MessageCollectorActor.clearMessages(leaderActor);
1987 // Send a Replicate message and wait for AppendEntries.
1988 sendReplicate(leaderActorContext, 0);
1990 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1991 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1993 // Send reply only from the voting follower and verify consensus via ApplyState.
1994 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1996 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1998 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2000 MessageCollectorActor.clearMessages(followerActor);
2001 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2002 MessageCollectorActor.clearMessages(leaderActor);
2004 // Send another Replicate message
2005 sendReplicate(leaderActorContext, 1);
2007 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2008 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2009 AppendEntries.class);
2010 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2011 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2013 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2014 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2016 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2018 // Send reply from the voting follower and verify consensus.
2019 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2021 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2025 public void testTransferLeadershipWithFollowerInSync() {
2026 logStart("testTransferLeadershipWithFollowerInSync");
2028 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2029 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2030 new FiniteDuration(1000, TimeUnit.SECONDS));
2031 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2033 leader = new Leader(leaderActorContext);
2034 leaderActorContext.setCurrentBehavior(leader);
2036 // Initial heartbeat
2037 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2038 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2039 MessageCollectorActor.clearMessages(followerActor);
2041 sendReplicate(leaderActorContext, 0);
2042 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2044 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2045 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2046 MessageCollectorActor.clearMessages(followerActor);
2048 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2049 leader.transferLeadership(mockTransferCohort);
2051 verify(mockTransferCohort, never()).transferComplete();
2052 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2053 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2055 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2056 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2058 // Leader should force an election timeout
2059 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2061 verify(mockTransferCohort).transferComplete();
2065 public void testTransferLeadershipWithEmptyLog() {
2066 logStart("testTransferLeadershipWithEmptyLog");
2068 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2069 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2070 new FiniteDuration(1000, TimeUnit.SECONDS));
2071 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2073 leader = new Leader(leaderActorContext);
2074 leaderActorContext.setCurrentBehavior(leader);
2076 // Initial heartbeat
2077 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2078 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2079 MessageCollectorActor.clearMessages(followerActor);
2081 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2082 leader.transferLeadership(mockTransferCohort);
2084 verify(mockTransferCohort, never()).transferComplete();
2085 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2088 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2089 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2091 // Leader should force an election timeout
2092 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2094 verify(mockTransferCohort).transferComplete();
2098 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2099 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2101 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2102 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2103 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2105 leader = new Leader(leaderActorContext);
2106 leaderActorContext.setCurrentBehavior(leader);
2108 // Initial heartbeat
2109 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110 MessageCollectorActor.clearMessages(followerActor);
2112 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2113 leader.transferLeadership(mockTransferCohort);
2115 verify(mockTransferCohort, never()).transferComplete();
2117 // Sync up the follower.
2118 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2119 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2120 MessageCollectorActor.clearMessages(followerActor);
2122 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2123 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2124 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2125 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2126 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2128 // Leader should force an election timeout
2129 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2131 verify(mockTransferCohort).transferComplete();
2135 public void testTransferLeadershipWithFollowerSyncTimeout() {
2136 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2138 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2139 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2140 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2141 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2142 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2144 leader = new Leader(leaderActorContext);
2145 leaderActorContext.setCurrentBehavior(leader);
2147 // Initial heartbeat
2148 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2150 MessageCollectorActor.clearMessages(followerActor);
2152 sendReplicate(leaderActorContext, 0);
2153 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2155 MessageCollectorActor.clearMessages(followerActor);
2157 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2158 leader.transferLeadership(mockTransferCohort);
2160 verify(mockTransferCohort, never()).transferComplete();
2162 // Send heartbeats to time out the transfer.
2163 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2164 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2165 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2166 leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2169 verify(mockTransferCohort).abortTransfer();
2170 verify(mockTransferCohort, never()).transferComplete();
2171 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2175 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2176 ActorRef actorRef, RaftRPC rpc) throws Exception {
2177 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2178 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2181 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2183 private final long electionTimeOutIntervalMillis;
2184 private final int snapshotChunkSize;
2186 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2188 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2189 this.snapshotChunkSize = snapshotChunkSize;
2193 public FiniteDuration getElectionTimeOutInterval() {
2194 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2198 public int getSnapshotChunkSize() {
2199 return snapshotChunkSize;