2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
35 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
39 import org.opendaylight.controller.cluster.raft.RaftState;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
42 import org.opendaylight.controller.cluster.raft.SerializationUtils;
43 import org.opendaylight.controller.cluster.raft.Snapshot;
44 import org.opendaylight.controller.cluster.raft.VotingState;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
49 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
61 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
62 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
63 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
64 import scala.concurrent.duration.FiniteDuration;
66 public class LeaderTest extends AbstractLeaderTest {
68 static final String FOLLOWER_ID = "follower";
69 public static final String LEADER_ID = "leader";
71 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
72 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
74 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
75 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
77 private Leader leader;
78 private final short payloadVersion = 5;
82 public void tearDown() throws Exception {
91 public void testHandleMessageForUnknownMessage() throws Exception {
92 logStart("testHandleMessageForUnknownMessage");
94 leader = new Leader(createActorContext());
96 // handle message should return the Leader state when it receives an
98 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
99 Assert.assertTrue(behavior instanceof Leader);
103 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106 MockRaftActorContext actorContext = createActorContextWithFollower();
107 short payloadVersion = (short)5;
108 actorContext.setPayloadVersion(payloadVersion);
111 actorContext.getTermInformation().update(term, "");
113 leader = new Leader(actorContext);
115 // Leader should send an immediate heartbeat with no entries as follower is inactive.
116 long lastIndex = actorContext.getReplicatedLog().lastIndex();
117 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
118 assertEquals("getTerm", term, appendEntries.getTerm());
119 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
120 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
121 assertEquals("Entries size", 0, appendEntries.getEntries().size());
122 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
124 // The follower would normally reply - simulate that explicitly here.
125 leader.handleMessage(followerActor, new AppendEntriesReply(
126 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
127 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
129 followerActor.underlyingActor().clear();
131 // Sleep for the heartbeat interval so AppendEntries is sent.
132 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
133 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
135 leader.handleMessage(leaderActor, new SendHeartBeat());
137 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
138 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
139 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
140 assertEquals("Entries size", 1, appendEntries.getEntries().size());
141 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
142 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
143 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
148 return sendReplicate(actorContext, 1, index);
151 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
152 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
153 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
154 term, index, payload);
155 actorContext.getReplicatedLog().append(newEntry);
156 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
161 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
163 MockRaftActorContext actorContext = createActorContextWithFollower();
166 actorContext.getTermInformation().update(term, "");
168 leader = new Leader(actorContext);
170 // Leader will send an immediate heartbeat - ignore it.
171 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
173 // The follower would normally reply - simulate that explicitly here.
174 long lastIndex = actorContext.getReplicatedLog().lastIndex();
175 leader.handleMessage(followerActor, new AppendEntriesReply(
176 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
177 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
179 followerActor.underlyingActor().clear();
181 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
183 // State should not change
184 assertTrue(raftBehavior instanceof Leader);
186 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
187 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
188 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
189 assertEquals("Entries size", 1, appendEntries.getEntries().size());
190 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
191 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
192 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
193 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197 public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
198 logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
200 MockRaftActorContext actorContext = createActorContextWithFollower();
202 // The raft context is initialized with a couple log entries. However the commitIndex
203 // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
204 // committed and applied. Now it regains leadership with a higher term (2).
205 long prevTerm = actorContext.getTermInformation().getCurrentTerm();
206 long newTerm = prevTerm + 1;
207 actorContext.getTermInformation().update(newTerm, "");
209 leader = new Leader(actorContext);
211 // Leader will send an immediate heartbeat - ignore it.
212 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
214 // The follower replies with the leader's current last index and term, simulating that it is
215 // up to date with the leader.
216 long lastIndex = actorContext.getReplicatedLog().lastIndex();
217 leader.handleMessage(followerActor, new AppendEntriesReply(
218 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
220 // The commit index should not get updated even though consensus was reached. This is b/c the
221 // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
222 // from previous terms by counting replicas".
223 assertEquals("Commit Index", -1, actorContext.getCommitIndex());
225 followerActor.underlyingActor().clear();
227 // Now replicate a new entry with the new term 2.
228 long newIndex = lastIndex + 1;
229 sendReplicate(actorContext, newTerm, newIndex);
231 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
233 assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
234 assertEquals("Entries size", 1, appendEntries.getEntries().size());
235 assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
236 assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
237 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
239 // The follower replies with success. The leader should now update the commit index to the new index
240 // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
241 // prior entries are committed indirectly".
242 leader.handleMessage(followerActor, new AppendEntriesReply(
243 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
245 assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
249 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
250 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
252 MockRaftActorContext actorContext = createActorContextWithFollower();
253 actorContext.setRaftPolicy(createRaftPolicy(true, true));
256 actorContext.getTermInformation().update(term, "");
258 leader = new Leader(actorContext);
260 // Leader will send an immediate heartbeat - ignore it.
261 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
263 // The follower would normally reply - simulate that explicitly here.
264 long lastIndex = actorContext.getReplicatedLog().lastIndex();
265 leader.handleMessage(followerActor, new AppendEntriesReply(
266 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
267 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
269 followerActor.underlyingActor().clear();
271 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
273 // State should not change
274 assertTrue(raftBehavior instanceof Leader);
276 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
277 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
278 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
279 assertEquals("Entries size", 1, appendEntries.getEntries().size());
280 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
281 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
282 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
283 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
287 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
288 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
290 MockRaftActorContext actorContext = createActorContextWithFollower();
291 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
293 public FiniteDuration getHeartBeatInterval() {
294 return FiniteDuration.apply(5, TimeUnit.SECONDS);
299 actorContext.getTermInformation().update(term, "");
301 leader = new Leader(actorContext);
303 // Leader will send an immediate heartbeat - ignore it.
304 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
306 // The follower would normally reply - simulate that explicitly here.
307 long lastIndex = actorContext.getReplicatedLog().lastIndex();
308 leader.handleMessage(followerActor, new AppendEntriesReply(
309 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
310 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
312 followerActor.underlyingActor().clear();
314 for(int i=0;i<5;i++) {
315 sendReplicate(actorContext, lastIndex+i+1);
318 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
319 // We expect only 1 message to be sent because of two reasons,
320 // - an append entries reply was not received
321 // - the heartbeat interval has not expired
322 // In this scenario if multiple messages are sent they would likely be duplicates
323 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
327 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
328 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
330 MockRaftActorContext actorContext = createActorContextWithFollower();
331 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
333 public FiniteDuration getHeartBeatInterval() {
334 return FiniteDuration.apply(5, TimeUnit.SECONDS);
339 actorContext.getTermInformation().update(term, "");
341 leader = new Leader(actorContext);
343 // Leader will send an immediate heartbeat - ignore it.
344 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
346 // The follower would normally reply - simulate that explicitly here.
347 long lastIndex = actorContext.getReplicatedLog().lastIndex();
348 leader.handleMessage(followerActor, new AppendEntriesReply(
349 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
350 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
352 followerActor.underlyingActor().clear();
354 for(int i=0;i<3;i++) {
355 sendReplicate(actorContext, lastIndex+i+1);
356 leader.handleMessage(followerActor, new AppendEntriesReply(
357 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
361 for(int i=3;i<5;i++) {
362 sendReplicate(actorContext, lastIndex + i + 1);
365 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
366 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
367 // get sent to the follower - but not the 5th
368 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
370 for(int i=0;i<4;i++) {
371 long expected = allMessages.get(i).getEntries().get(0).getIndex();
372 assertEquals(expected, i+2);
377 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
378 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
380 MockRaftActorContext actorContext = createActorContextWithFollower();
381 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
383 public FiniteDuration getHeartBeatInterval() {
384 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
389 actorContext.getTermInformation().update(term, "");
391 leader = new Leader(actorContext);
393 // Leader will send an immediate heartbeat - ignore it.
394 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
396 // The follower would normally reply - simulate that explicitly here.
397 long lastIndex = actorContext.getReplicatedLog().lastIndex();
398 leader.handleMessage(followerActor, new AppendEntriesReply(
399 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
400 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
402 followerActor.underlyingActor().clear();
404 sendReplicate(actorContext, lastIndex+1);
406 // Wait slightly longer than heartbeat duration
407 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
409 leader.handleMessage(leaderActor, new SendHeartBeat());
411 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
412 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
414 assertEquals(1, allMessages.get(0).getEntries().size());
415 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
416 assertEquals(1, allMessages.get(1).getEntries().size());
417 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
423 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
425 MockRaftActorContext actorContext = createActorContextWithFollower();
426 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
428 public FiniteDuration getHeartBeatInterval() {
429 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
434 actorContext.getTermInformation().update(term, "");
436 leader = new Leader(actorContext);
438 // Leader will send an immediate heartbeat - ignore it.
439 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
441 // The follower would normally reply - simulate that explicitly here.
442 long lastIndex = actorContext.getReplicatedLog().lastIndex();
443 leader.handleMessage(followerActor, new AppendEntriesReply(
444 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
445 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
447 followerActor.underlyingActor().clear();
449 for(int i=0;i<3;i++) {
450 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
451 leader.handleMessage(leaderActor, new SendHeartBeat());
454 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
459 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
460 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
462 MockRaftActorContext actorContext = createActorContextWithFollower();
463 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
465 public FiniteDuration getHeartBeatInterval() {
466 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
471 actorContext.getTermInformation().update(term, "");
473 leader = new Leader(actorContext);
475 // Leader will send an immediate heartbeat - ignore it.
476 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478 // The follower would normally reply - simulate that explicitly here.
479 long lastIndex = actorContext.getReplicatedLog().lastIndex();
480 leader.handleMessage(followerActor, new AppendEntriesReply(
481 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
482 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
484 followerActor.underlyingActor().clear();
486 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
487 leader.handleMessage(leaderActor, new SendHeartBeat());
488 sendReplicate(actorContext, lastIndex+1);
490 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
491 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
493 assertEquals(0, allMessages.get(0).getEntries().size());
494 assertEquals(1, allMessages.get(1).getEntries().size());
499 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
500 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
502 MockRaftActorContext actorContext = createActorContext();
504 leader = new Leader(actorContext);
506 actorContext.setLastApplied(0);
508 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
509 long term = actorContext.getTermInformation().getCurrentTerm();
510 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
511 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
513 actorContext.getReplicatedLog().append(newEntry);
515 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
516 new Replicate(leaderActor, "state-id", newEntry));
518 // State should not change
519 assertTrue(raftBehavior instanceof Leader);
521 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
523 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
524 // one since lastApplied state is 0.
525 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
526 leaderActor, ApplyState.class);
527 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
529 for(int i = 0; i <= newLogIndex - 1; i++ ) {
530 ApplyState applyState = applyStateList.get(i);
531 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
532 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
535 ApplyState last = applyStateList.get((int) newLogIndex - 1);
536 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
537 assertEquals("getIdentifier", "state-id", last.getIdentifier());
541 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
542 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
544 MockRaftActorContext actorContext = createActorContextWithFollower();
546 Map<String, String> leadersSnapshot = new HashMap<>();
547 leadersSnapshot.put("1", "A");
548 leadersSnapshot.put("2", "B");
549 leadersSnapshot.put("3", "C");
552 actorContext.getReplicatedLog().removeFrom(0);
554 final int commitIndex = 3;
555 final int snapshotIndex = 2;
556 final int newEntryIndex = 4;
557 final int snapshotTerm = 1;
558 final int currentTerm = 2;
560 // set the snapshot variables in replicatedlog
561 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563 actorContext.setCommitIndex(commitIndex);
564 //set follower timeout to 2 mins, helps during debugging
565 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
567 leader = new Leader(actorContext);
569 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
570 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
573 ReplicatedLogImplEntry entry =
574 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
575 new MockRaftActorContext.MockPayload("D"));
577 //update follower timestamp
578 leader.markFollowerActive(FOLLOWER_ID);
580 ByteString bs = toByteString(leadersSnapshot);
581 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
582 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
583 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
584 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
586 //send first chunk and no InstallSnapshotReply received yet
588 fts.incrementChunkIndex();
590 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
591 TimeUnit.MILLISECONDS);
593 leader.handleMessage(leaderActor, new SendHeartBeat());
595 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
597 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
599 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
601 //InstallSnapshotReply received
602 fts.markSendStatus(true);
604 leader.handleMessage(leaderActor, new SendHeartBeat());
606 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
608 assertEquals(commitIndex, is.getLastIncludedIndex());
612 public void testSendAppendEntriesSnapshotScenario() throws Exception {
613 logStart("testSendAppendEntriesSnapshotScenario");
615 MockRaftActorContext actorContext = createActorContextWithFollower();
617 Map<String, String> leadersSnapshot = new HashMap<>();
618 leadersSnapshot.put("1", "A");
619 leadersSnapshot.put("2", "B");
620 leadersSnapshot.put("3", "C");
623 actorContext.getReplicatedLog().removeFrom(0);
625 final int followersLastIndex = 2;
626 final int snapshotIndex = 3;
627 final int newEntryIndex = 4;
628 final int snapshotTerm = 1;
629 final int currentTerm = 2;
631 // set the snapshot variables in replicatedlog
632 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
633 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
634 actorContext.setCommitIndex(followersLastIndex);
636 leader = new Leader(actorContext);
638 // Leader will send an immediate heartbeat - ignore it.
639 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
642 ReplicatedLogImplEntry entry =
643 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
644 new MockRaftActorContext.MockPayload("D"));
646 actorContext.getReplicatedLog().append(entry);
648 //update follower timestamp
649 leader.markFollowerActive(FOLLOWER_ID);
651 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
652 RaftActorBehavior raftBehavior = leader.handleMessage(
653 leaderActor, new Replicate(null, "state-id", entry));
655 assertTrue(raftBehavior instanceof Leader);
657 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
661 public void testInitiateInstallSnapshot() throws Exception {
662 logStart("testInitiateInstallSnapshot");
664 MockRaftActorContext actorContext = createActorContextWithFollower();
667 actorContext.getReplicatedLog().removeFrom(0);
669 final int followersLastIndex = 2;
670 final int snapshotIndex = 3;
671 final int newEntryIndex = 4;
672 final int snapshotTerm = 1;
673 final int currentTerm = 2;
675 // set the snapshot variables in replicatedlog
676 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
677 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
678 actorContext.setLastApplied(3);
679 actorContext.setCommitIndex(followersLastIndex);
681 leader = new Leader(actorContext);
683 // Leader will send an immediate heartbeat - ignore it.
684 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
686 // set the snapshot as absent and check if capture-snapshot is invoked.
687 leader.setSnapshot(null);
690 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
691 new MockRaftActorContext.MockPayload("D"));
693 actorContext.getReplicatedLog().append(entry);
695 //update follower timestamp
696 leader.markFollowerActive(FOLLOWER_ID);
698 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
700 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
702 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
704 assertTrue(cs.isInstallSnapshotInitiated());
705 assertEquals(3, cs.getLastAppliedIndex());
706 assertEquals(1, cs.getLastAppliedTerm());
707 assertEquals(4, cs.getLastIndex());
708 assertEquals(2, cs.getLastTerm());
710 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
711 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
713 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
717 public void testInitiateForceInstallSnapshot() throws Exception {
718 logStart("testInitiateForceInstallSnapshot");
720 MockRaftActorContext actorContext = createActorContextWithFollower();
722 final int followersLastIndex = 2;
723 final int snapshotIndex = -1;
724 final int newEntryIndex = 4;
725 final int snapshotTerm = -1;
726 final int currentTerm = 2;
728 // set the snapshot variables in replicatedlog
729 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
730 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
731 actorContext.setLastApplied(3);
732 actorContext.setCommitIndex(followersLastIndex);
734 actorContext.getReplicatedLog().removeFrom(0);
736 leader = new Leader(actorContext);
738 // Leader will send an immediate heartbeat - ignore it.
739 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
741 // set the snapshot as absent and check if capture-snapshot is invoked.
742 leader.setSnapshot(null);
744 for(int i=0;i<4;i++) {
745 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
746 new MockRaftActorContext.MockPayload("X" + i)));
750 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
751 new MockRaftActorContext.MockPayload("D"));
753 actorContext.getReplicatedLog().append(entry);
755 //update follower timestamp
756 leader.markFollowerActive(FOLLOWER_ID);
758 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
759 // installed with a SendInstallSnapshot
760 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
762 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
764 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
766 assertTrue(cs.isInstallSnapshotInitiated());
767 assertEquals(3, cs.getLastAppliedIndex());
768 assertEquals(1, cs.getLastAppliedTerm());
769 assertEquals(4, cs.getLastIndex());
770 assertEquals(2, cs.getLastTerm());
772 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
773 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
775 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
780 public void testInstallSnapshot() throws Exception {
781 logStart("testInstallSnapshot");
783 MockRaftActorContext actorContext = createActorContextWithFollower();
785 Map<String, String> leadersSnapshot = new HashMap<>();
786 leadersSnapshot.put("1", "A");
787 leadersSnapshot.put("2", "B");
788 leadersSnapshot.put("3", "C");
791 actorContext.getReplicatedLog().removeFrom(0);
793 final int lastAppliedIndex = 3;
794 final int snapshotIndex = 2;
795 final int snapshotTerm = 1;
796 final int currentTerm = 2;
798 // set the snapshot variables in replicatedlog
799 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
800 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
801 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802 actorContext.setCommitIndex(lastAppliedIndex);
803 actorContext.setLastApplied(lastAppliedIndex);
805 leader = new Leader(actorContext);
807 // Initial heartbeat.
808 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
810 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
811 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
813 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
814 Collections.<ReplicatedLogEntry>emptyList(),
815 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
817 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
819 assertTrue(raftBehavior instanceof Leader);
821 // check if installsnapshot gets called with the correct values.
823 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825 assertNotNull(installSnapshot.getData());
826 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
827 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
829 assertEquals(currentTerm, installSnapshot.getTerm());
833 public void testForceInstallSnapshot() throws Exception {
834 logStart("testForceInstallSnapshot");
836 MockRaftActorContext actorContext = createActorContextWithFollower();
838 Map<String, String> leadersSnapshot = new HashMap<>();
839 leadersSnapshot.put("1", "A");
840 leadersSnapshot.put("2", "B");
841 leadersSnapshot.put("3", "C");
843 final int lastAppliedIndex = 3;
844 final int snapshotIndex = -1;
845 final int snapshotTerm = -1;
846 final int currentTerm = 2;
848 // set the snapshot variables in replicatedlog
849 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
850 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
851 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
852 actorContext.setCommitIndex(lastAppliedIndex);
853 actorContext.setLastApplied(lastAppliedIndex);
855 leader = new Leader(actorContext);
857 // Initial heartbeat.
858 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
860 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
861 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
863 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
864 Collections.<ReplicatedLogEntry>emptyList(),
865 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
867 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
869 assertTrue(raftBehavior instanceof Leader);
871 // check if installsnapshot gets called with the correct values.
873 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
875 assertNotNull(installSnapshot.getData());
876 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
877 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
879 assertEquals(currentTerm, installSnapshot.getTerm());
883 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
884 logStart("testHandleInstallSnapshotReplyLastChunk");
886 MockRaftActorContext actorContext = createActorContextWithFollower();
888 final int commitIndex = 3;
889 final int snapshotIndex = 2;
890 final int snapshotTerm = 1;
891 final int currentTerm = 2;
893 actorContext.setCommitIndex(commitIndex);
895 leader = new Leader(actorContext);
897 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
898 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
900 // Ignore initial heartbeat.
901 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
903 Map<String, String> leadersSnapshot = new HashMap<>();
904 leadersSnapshot.put("1", "A");
905 leadersSnapshot.put("2", "B");
906 leadersSnapshot.put("3", "C");
908 // set the snapshot variables in replicatedlog
910 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
911 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
912 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
914 ByteString bs = toByteString(leadersSnapshot);
915 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
916 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
917 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
918 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
919 while(!fts.isLastChunk(fts.getChunkIndex())) {
921 fts.incrementChunkIndex();
925 actorContext.getReplicatedLog().removeFrom(0);
927 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
928 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
930 assertTrue(raftBehavior instanceof Leader);
932 assertEquals(0, leader.followerSnapshotSize());
933 assertEquals(1, leader.followerLogSize());
934 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
936 assertEquals(commitIndex, fli.getMatchIndex());
937 assertEquals(commitIndex + 1, fli.getNextIndex());
941 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
942 logStart("testSendSnapshotfromInstallSnapshotReply");
944 MockRaftActorContext actorContext = createActorContextWithFollower();
946 final int commitIndex = 3;
947 final int snapshotIndex = 2;
948 final int snapshotTerm = 1;
949 final int currentTerm = 2;
951 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
953 public int getSnapshotChunkSize() {
957 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
958 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
960 actorContext.setConfigParams(configParams);
961 actorContext.setCommitIndex(commitIndex);
963 leader = new Leader(actorContext);
965 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
966 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
968 Map<String, String> leadersSnapshot = new HashMap<>();
969 leadersSnapshot.put("1", "A");
970 leadersSnapshot.put("2", "B");
971 leadersSnapshot.put("3", "C");
973 // set the snapshot variables in replicatedlog
974 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
975 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
976 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
978 ByteString bs = toByteString(leadersSnapshot);
979 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
980 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
981 leader.setSnapshot(snapshot);
983 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
985 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
987 assertEquals(1, installSnapshot.getChunkIndex());
988 assertEquals(3, installSnapshot.getTotalChunks());
990 followerActor.underlyingActor().clear();
991 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
992 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
994 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
996 assertEquals(2, installSnapshot.getChunkIndex());
997 assertEquals(3, installSnapshot.getTotalChunks());
999 followerActor.underlyingActor().clear();
1000 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1001 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1003 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1005 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1006 followerActor.underlyingActor().clear();
1007 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1008 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1010 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1012 Assert.assertNull(installSnapshot);
1017 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1018 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1020 MockRaftActorContext actorContext = createActorContextWithFollower();
1022 final int commitIndex = 3;
1023 final int snapshotIndex = 2;
1024 final int snapshotTerm = 1;
1025 final int currentTerm = 2;
1027 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1029 public int getSnapshotChunkSize() {
1034 actorContext.setCommitIndex(commitIndex);
1036 leader = new Leader(actorContext);
1038 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1039 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1041 Map<String, String> leadersSnapshot = new HashMap<>();
1042 leadersSnapshot.put("1", "A");
1043 leadersSnapshot.put("2", "B");
1044 leadersSnapshot.put("3", "C");
1046 // set the snapshot variables in replicatedlog
1047 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1048 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1049 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1051 ByteString bs = toByteString(leadersSnapshot);
1052 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1053 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1054 leader.setSnapshot(snapshot);
1056 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1057 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1059 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1061 assertEquals(1, installSnapshot.getChunkIndex());
1062 assertEquals(3, installSnapshot.getTotalChunks());
1064 followerActor.underlyingActor().clear();
1066 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1067 FOLLOWER_ID, -1, false));
1069 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1070 TimeUnit.MILLISECONDS);
1072 leader.handleMessage(leaderActor, new SendHeartBeat());
1074 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1076 assertEquals(1, installSnapshot.getChunkIndex());
1077 assertEquals(3, installSnapshot.getTotalChunks());
1081 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1082 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1084 MockRaftActorContext actorContext = createActorContextWithFollower();
1086 final int commitIndex = 3;
1087 final int snapshotIndex = 2;
1088 final int snapshotTerm = 1;
1089 final int currentTerm = 2;
1091 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1093 public int getSnapshotChunkSize() {
1098 actorContext.setCommitIndex(commitIndex);
1100 leader = new Leader(actorContext);
1102 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1103 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1105 Map<String, String> leadersSnapshot = new HashMap<>();
1106 leadersSnapshot.put("1", "A");
1107 leadersSnapshot.put("2", "B");
1108 leadersSnapshot.put("3", "C");
1110 // set the snapshot variables in replicatedlog
1111 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1112 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1113 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1115 ByteString bs = toByteString(leadersSnapshot);
1116 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1117 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1118 leader.setSnapshot(snapshot);
1120 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1122 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1124 assertEquals(1, installSnapshot.getChunkIndex());
1125 assertEquals(3, installSnapshot.getTotalChunks());
1126 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1128 int hashCode = installSnapshot.getData().hashCode();
1130 followerActor.underlyingActor().clear();
1132 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1133 FOLLOWER_ID, 1, true));
1135 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1137 assertEquals(2, installSnapshot.getChunkIndex());
1138 assertEquals(3, installSnapshot.getTotalChunks());
1139 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1143 public void testFollowerToSnapshotLogic() {
1144 logStart("testFollowerToSnapshotLogic");
1146 MockRaftActorContext actorContext = createActorContext();
1148 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1150 public int getSnapshotChunkSize() {
1155 leader = new Leader(actorContext);
1157 Map<String, String> leadersSnapshot = new HashMap<>();
1158 leadersSnapshot.put("1", "A");
1159 leadersSnapshot.put("2", "B");
1160 leadersSnapshot.put("3", "C");
1162 ByteString bs = toByteString(leadersSnapshot);
1163 byte[] barray = bs.toByteArray();
1165 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1166 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1168 assertEquals(bs.size(), barray.length);
1171 for (int i=0; i < barray.length; i = i + 50) {
1175 if (i + 50 > barray.length) {
1179 ByteString chunk = fts.getNextChunk();
1180 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1181 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1183 fts.markSendStatus(true);
1184 if (!fts.isLastChunk(chunkIndex)) {
1185 fts.incrementChunkIndex();
1189 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1192 @Override protected RaftActorBehavior createBehavior(
1193 RaftActorContext actorContext) {
1194 return new Leader(actorContext);
1198 protected MockRaftActorContext createActorContext() {
1199 return createActorContext(leaderActor);
1203 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1204 return createActorContext(LEADER_ID, actorRef);
1207 private MockRaftActorContext createActorContextWithFollower() {
1208 MockRaftActorContext actorContext = createActorContext();
1209 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1210 followerActor.path().toString()).build());
1211 return actorContext;
1214 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1215 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1216 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1217 configParams.setElectionTimeoutFactor(100000);
1218 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1219 context.setConfigParams(configParams);
1220 context.setPayloadVersion(payloadVersion);
1224 private MockRaftActorContext createFollowerActorContextWithLeader() {
1225 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1226 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1227 followerConfig.setElectionTimeoutFactor(10000);
1228 followerActorContext.setConfigParams(followerConfig);
1229 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1230 return followerActorContext;
1234 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1235 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1237 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1239 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1241 Follower follower = new Follower(followerActorContext);
1242 followerActor.underlyingActor().setBehavior(follower);
1244 Map<String, String> peerAddresses = new HashMap<>();
1245 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1247 leaderActorContext.setPeerAddresses(peerAddresses);
1249 leaderActorContext.getReplicatedLog().removeFrom(0);
1252 leaderActorContext.setReplicatedLog(
1253 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1255 leaderActorContext.setCommitIndex(1);
1257 followerActorContext.getReplicatedLog().removeFrom(0);
1259 // follower too has the exact same log entries and has the same commit index
1260 followerActorContext.setReplicatedLog(
1261 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1263 followerActorContext.setCommitIndex(1);
1265 leader = new Leader(leaderActorContext);
1267 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1269 assertEquals(1, appendEntries.getLeaderCommit());
1270 assertEquals(0, appendEntries.getEntries().size());
1271 assertEquals(0, appendEntries.getPrevLogIndex());
1273 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1274 leaderActor, AppendEntriesReply.class);
1276 assertEquals(2, appendEntriesReply.getLogLastIndex());
1277 assertEquals(1, appendEntriesReply.getLogLastTerm());
1279 // follower returns its next index
1280 assertEquals(2, appendEntriesReply.getLogLastIndex());
1281 assertEquals(1, appendEntriesReply.getLogLastTerm());
1287 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1288 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1290 MockRaftActorContext leaderActorContext = createActorContext();
1292 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1293 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1295 Follower follower = new Follower(followerActorContext);
1296 followerActor.underlyingActor().setBehavior(follower);
1298 Map<String, String> leaderPeerAddresses = new HashMap<>();
1299 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1301 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1303 leaderActorContext.getReplicatedLog().removeFrom(0);
1305 leaderActorContext.setReplicatedLog(
1306 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308 leaderActorContext.setCommitIndex(1);
1310 followerActorContext.getReplicatedLog().removeFrom(0);
1312 followerActorContext.setReplicatedLog(
1313 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315 // follower has the same log entries but its commit index > leaders commit index
1316 followerActorContext.setCommitIndex(2);
1318 leader = new Leader(leaderActorContext);
1320 // Initial heartbeat
1321 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1323 assertEquals(1, appendEntries.getLeaderCommit());
1324 assertEquals(0, appendEntries.getEntries().size());
1325 assertEquals(0, appendEntries.getPrevLogIndex());
1327 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1328 leaderActor, AppendEntriesReply.class);
1330 assertEquals(2, appendEntriesReply.getLogLastIndex());
1331 assertEquals(1, appendEntriesReply.getLogLastTerm());
1333 leaderActor.underlyingActor().setBehavior(follower);
1334 leader.handleMessage(followerActor, appendEntriesReply);
1336 leaderActor.underlyingActor().clear();
1337 followerActor.underlyingActor().clear();
1339 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1340 TimeUnit.MILLISECONDS);
1342 leader.handleMessage(leaderActor, new SendHeartBeat());
1344 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1346 assertEquals(2, appendEntries.getLeaderCommit());
1347 assertEquals(0, appendEntries.getEntries().size());
1348 assertEquals(2, appendEntries.getPrevLogIndex());
1350 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1352 assertEquals(2, appendEntriesReply.getLogLastIndex());
1353 assertEquals(1, appendEntriesReply.getLogLastTerm());
1355 assertEquals(2, followerActorContext.getCommitIndex());
1361 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1362 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1364 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1365 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1366 new FiniteDuration(1000, TimeUnit.SECONDS));
1368 leaderActorContext.setReplicatedLog(
1369 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1370 long leaderCommitIndex = 2;
1371 leaderActorContext.setCommitIndex(leaderCommitIndex);
1372 leaderActorContext.setLastApplied(leaderCommitIndex);
1374 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1375 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1377 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1379 followerActorContext.setReplicatedLog(
1380 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1381 followerActorContext.setCommitIndex(0);
1382 followerActorContext.setLastApplied(0);
1384 Follower follower = new Follower(followerActorContext);
1385 followerActor.underlyingActor().setBehavior(follower);
1387 leader = new Leader(leaderActorContext);
1389 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1390 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1392 MessageCollectorActor.clearMessages(followerActor);
1393 MessageCollectorActor.clearMessages(leaderActor);
1395 // Verify initial AppendEntries sent with the leader's current commit index.
1396 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1397 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1398 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1400 leaderActor.underlyingActor().setBehavior(leader);
1402 leader.handleMessage(followerActor, appendEntriesReply);
1404 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1405 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1407 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1409 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1411 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1412 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1413 appendEntries.getEntries().get(0).getData());
1414 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1415 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1416 appendEntries.getEntries().get(1).getData());
1418 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1419 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1421 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1423 ApplyState applyState = applyStateList.get(0);
1424 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1425 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1426 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1427 applyState.getReplicatedLogEntry().getData());
1429 applyState = applyStateList.get(1);
1430 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1431 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1432 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1433 applyState.getReplicatedLogEntry().getData());
1435 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1436 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1440 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1441 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1443 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1444 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1445 new FiniteDuration(1000, TimeUnit.SECONDS));
1447 leaderActorContext.setReplicatedLog(
1448 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1449 long leaderCommitIndex = 1;
1450 leaderActorContext.setCommitIndex(leaderCommitIndex);
1451 leaderActorContext.setLastApplied(leaderCommitIndex);
1453 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1454 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1456 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1458 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1459 followerActorContext.setCommitIndex(-1);
1460 followerActorContext.setLastApplied(-1);
1462 Follower follower = new Follower(followerActorContext);
1463 followerActor.underlyingActor().setBehavior(follower);
1465 leader = new Leader(leaderActorContext);
1467 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1468 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1470 MessageCollectorActor.clearMessages(followerActor);
1471 MessageCollectorActor.clearMessages(leaderActor);
1473 // Verify initial AppendEntries sent with the leader's current commit index.
1474 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1475 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1476 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1478 leaderActor.underlyingActor().setBehavior(leader);
1480 leader.handleMessage(followerActor, appendEntriesReply);
1482 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1483 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1485 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1486 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1487 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1489 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1490 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1491 appendEntries.getEntries().get(0).getData());
1492 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1493 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1494 appendEntries.getEntries().get(1).getData());
1496 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1497 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1499 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1501 ApplyState applyState = applyStateList.get(0);
1502 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1503 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1504 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1505 applyState.getReplicatedLogEntry().getData());
1507 applyState = applyStateList.get(1);
1508 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1509 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1510 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1511 applyState.getReplicatedLogEntry().getData());
1513 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1514 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1518 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1519 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1521 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1522 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1523 new FiniteDuration(1000, TimeUnit.SECONDS));
1525 leaderActorContext.setReplicatedLog(
1526 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1527 long leaderCommitIndex = 1;
1528 leaderActorContext.setCommitIndex(leaderCommitIndex);
1529 leaderActorContext.setLastApplied(leaderCommitIndex);
1531 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1532 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1534 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1536 followerActorContext.setReplicatedLog(
1537 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1538 followerActorContext.setCommitIndex(-1);
1539 followerActorContext.setLastApplied(-1);
1541 Follower follower = new Follower(followerActorContext);
1542 followerActor.underlyingActor().setBehavior(follower);
1544 leader = new Leader(leaderActorContext);
1546 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1547 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1549 MessageCollectorActor.clearMessages(followerActor);
1550 MessageCollectorActor.clearMessages(leaderActor);
1552 // Verify initial AppendEntries sent with the leader's current commit index.
1553 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1554 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1555 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1557 leaderActor.underlyingActor().setBehavior(leader);
1559 leader.handleMessage(followerActor, appendEntriesReply);
1561 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1562 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1564 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1565 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1566 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1568 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1569 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1570 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1571 appendEntries.getEntries().get(0).getData());
1572 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1573 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1574 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1575 appendEntries.getEntries().get(1).getData());
1577 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1578 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1580 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1582 ApplyState applyState = applyStateList.get(0);
1583 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1584 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1585 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1586 applyState.getReplicatedLogEntry().getData());
1588 applyState = applyStateList.get(1);
1589 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1590 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1591 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1592 applyState.getReplicatedLogEntry().getData());
1594 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1595 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1596 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1600 public void testHandleAppendEntriesReplyWithNewerTerm(){
1601 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1603 MockRaftActorContext leaderActorContext = createActorContext();
1604 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1605 new FiniteDuration(10000, TimeUnit.SECONDS));
1607 leaderActorContext.setReplicatedLog(
1608 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1610 leader = new Leader(leaderActorContext);
1611 leaderActor.underlyingActor().setBehavior(leader);
1612 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1614 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1616 assertEquals(false, appendEntriesReply.isSuccess());
1617 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1619 MessageCollectorActor.clearMessages(leaderActor);
1623 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1624 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1626 MockRaftActorContext leaderActorContext = createActorContext();
1627 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1628 new FiniteDuration(10000, TimeUnit.SECONDS));
1630 leaderActorContext.setReplicatedLog(
1631 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1632 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1634 leader = new Leader(leaderActorContext);
1635 leaderActor.underlyingActor().setBehavior(leader);
1636 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1638 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1640 assertEquals(false, appendEntriesReply.isSuccess());
1641 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1643 MessageCollectorActor.clearMessages(leaderActor);
1647 public void testHandleAppendEntriesReplySuccess() throws Exception {
1648 logStart("testHandleAppendEntriesReplySuccess");
1650 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1652 leaderActorContext.setReplicatedLog(
1653 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1655 leaderActorContext.setCommitIndex(1);
1656 leaderActorContext.setLastApplied(1);
1657 leaderActorContext.getTermInformation().update(1, "leader");
1659 leader = new Leader(leaderActorContext);
1661 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1663 short payloadVersion = 5;
1664 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1666 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1668 assertEquals(RaftState.Leader, raftActorBehavior.state());
1670 assertEquals(2, leaderActorContext.getCommitIndex());
1672 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1673 leaderActor, ApplyJournalEntries.class);
1675 assertEquals(2, leaderActorContext.getLastApplied());
1677 assertEquals(2, applyJournalEntries.getToIndex());
1679 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1682 assertEquals(1,applyStateList.size());
1684 ApplyState applyState = applyStateList.get(0);
1686 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1688 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1689 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1693 public void testHandleAppendEntriesReplyUnknownFollower(){
1694 logStart("testHandleAppendEntriesReplyUnknownFollower");
1696 MockRaftActorContext leaderActorContext = createActorContext();
1698 leader = new Leader(leaderActorContext);
1700 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1702 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1704 assertEquals(RaftState.Leader, raftActorBehavior.state());
1708 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1709 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1711 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1712 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1713 new FiniteDuration(1000, TimeUnit.SECONDS));
1714 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1716 leaderActorContext.setReplicatedLog(
1717 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1718 long leaderCommitIndex = 3;
1719 leaderActorContext.setCommitIndex(leaderCommitIndex);
1720 leaderActorContext.setLastApplied(leaderCommitIndex);
1722 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1723 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1724 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1725 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1727 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1729 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1730 followerActorContext.setCommitIndex(-1);
1731 followerActorContext.setLastApplied(-1);
1733 Follower follower = new Follower(followerActorContext);
1734 followerActor.underlyingActor().setBehavior(follower);
1736 leader = new Leader(leaderActorContext);
1738 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1739 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1741 MessageCollectorActor.clearMessages(followerActor);
1742 MessageCollectorActor.clearMessages(leaderActor);
1744 // Verify initial AppendEntries sent with the leader's current commit index.
1745 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1746 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1747 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1749 leaderActor.underlyingActor().setBehavior(leader);
1751 leader.handleMessage(followerActor, appendEntriesReply);
1753 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1754 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1756 appendEntries = appendEntriesList.get(0);
1757 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1758 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1759 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1761 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1762 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1763 appendEntries.getEntries().get(0).getData());
1764 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1765 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1766 appendEntries.getEntries().get(1).getData());
1768 appendEntries = appendEntriesList.get(1);
1769 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1770 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1771 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1773 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1774 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1775 appendEntries.getEntries().get(0).getData());
1776 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1777 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1778 appendEntries.getEntries().get(1).getData());
1780 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1781 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1783 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1785 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1786 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1790 public void testHandleRequestVoteReply(){
1791 logStart("testHandleRequestVoteReply");
1793 MockRaftActorContext leaderActorContext = createActorContext();
1795 leader = new Leader(leaderActorContext);
1797 // Should be a no-op.
1798 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1799 new RequestVoteReply(1, true));
1801 assertEquals(RaftState.Leader, raftActorBehavior.state());
1803 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1805 assertEquals(RaftState.Leader, raftActorBehavior.state());
1809 public void testIsolatedLeaderCheckNoFollowers() {
1810 logStart("testIsolatedLeaderCheckNoFollowers");
1812 MockRaftActorContext leaderActorContext = createActorContext();
1814 leader = new Leader(leaderActorContext);
1815 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1816 Assert.assertTrue(behavior instanceof Leader);
1819 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1820 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1821 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1823 MockRaftActorContext leaderActorContext = createActorContext();
1825 Map<String, String> peerAddresses = new HashMap<>();
1826 peerAddresses.put("follower-1", followerActor1.path().toString());
1827 peerAddresses.put("follower-2", followerActor2.path().toString());
1829 leaderActorContext.setPeerAddresses(peerAddresses);
1830 leaderActorContext.setRaftPolicy(raftPolicy);
1832 leader = new Leader(leaderActorContext);
1834 leader.markFollowerActive("follower-1");
1835 leader.markFollowerActive("follower-2");
1836 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1837 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1838 behavior instanceof Leader);
1840 // kill 1 follower and verify if that got killed
1841 final JavaTestKit probe = new JavaTestKit(getSystem());
1842 probe.watch(followerActor1);
1843 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1844 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1845 assertEquals(termMsg1.getActor(), followerActor1);
1847 leader.markFollowerInActive("follower-1");
1848 leader.markFollowerActive("follower-2");
1849 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1850 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1851 behavior instanceof Leader);
1853 // kill 2nd follower and leader should change to Isolated leader
1854 followerActor2.tell(PoisonPill.getInstance(), null);
1855 probe.watch(followerActor2);
1856 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1857 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1858 assertEquals(termMsg2.getActor(), followerActor2);
1860 leader.markFollowerInActive("follower-2");
1861 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1865 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1866 logStart("testIsolatedLeaderCheckTwoFollowers");
1868 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1870 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1871 behavior instanceof IsolatedLeader);
1875 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1876 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1878 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1880 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1881 behavior instanceof Leader);
1885 public void testLaggingFollowerStarvation() throws Exception {
1886 logStart("testLaggingFollowerStarvation");
1887 new JavaTestKit(getSystem()) {{
1888 String leaderActorId = actorFactory.generateActorId("leader");
1889 String follower1ActorId = actorFactory.generateActorId("follower");
1890 String follower2ActorId = actorFactory.generateActorId("follower");
1892 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1893 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1894 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1895 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1897 MockRaftActorContext leaderActorContext =
1898 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1900 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1901 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1902 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1904 leaderActorContext.setConfigParams(configParams);
1906 leaderActorContext.setReplicatedLog(
1907 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1909 Map<String, String> peerAddresses = new HashMap<>();
1910 peerAddresses.put(follower1ActorId,
1911 follower1Actor.path().toString());
1912 peerAddresses.put(follower2ActorId,
1913 follower2Actor.path().toString());
1915 leaderActorContext.setPeerAddresses(peerAddresses);
1916 leaderActorContext.getTermInformation().update(1, leaderActorId);
1918 RaftActorBehavior leader = createBehavior(leaderActorContext);
1920 leaderActor.underlyingActor().setBehavior(leader);
1922 for(int i=1;i<6;i++) {
1923 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1924 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1925 assertTrue(newBehavior == leader);
1926 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1929 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1930 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1932 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1933 heartbeats.size() > 1);
1935 // Check if follower-2 got AppendEntries during this time and was not starved
1936 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1938 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1939 appendEntries.size() > 1);
1945 public void testReplicationConsensusWithNonVotingFollower() {
1946 logStart("testReplicationConsensusWithNonVotingFollower");
1948 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1949 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1950 new FiniteDuration(1000, TimeUnit.SECONDS));
1952 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1954 String nonVotingFollowerId = "nonvoting-follower";
1955 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1956 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1958 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1960 leader = new Leader(leaderActorContext);
1962 // Ignore initial heartbeats
1963 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1964 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1966 MessageCollectorActor.clearMessages(followerActor);
1967 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1968 MessageCollectorActor.clearMessages(leaderActor);
1970 // Send a Replicate message and wait for AppendEntries.
1971 sendReplicate(leaderActorContext, 0);
1973 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1974 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1976 // Send reply only from the voting follower and verify consensus via ApplyState.
1977 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1979 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1981 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1983 MessageCollectorActor.clearMessages(followerActor);
1984 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1985 MessageCollectorActor.clearMessages(leaderActor);
1987 // Send another Replicate message
1988 sendReplicate(leaderActorContext, 1);
1990 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1991 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1992 AppendEntries.class);
1993 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1994 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1996 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1997 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1999 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2001 // Send reply from the voting follower and verify consensus.
2002 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2004 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2008 public void testTransferLeadershipWithFollowerInSync() {
2009 logStart("testTransferLeadershipWithFollowerInSync");
2011 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2012 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2013 new FiniteDuration(1000, TimeUnit.SECONDS));
2014 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2016 leader = new Leader(leaderActorContext);
2018 // Initial heartbeat
2019 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2020 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2021 MessageCollectorActor.clearMessages(followerActor);
2023 sendReplicate(leaderActorContext, 0);
2024 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2026 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2027 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2028 MessageCollectorActor.clearMessages(followerActor);
2030 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2031 leader.transferLeadership(mockTransferCohort);
2033 verify(mockTransferCohort, never()).transferComplete();
2034 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2035 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2037 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2038 MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2040 // Leader should force an election timeout
2041 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2043 verify(mockTransferCohort).transferComplete();
2047 public void testTransferLeadershipWithEmptyLog() {
2048 logStart("testTransferLeadershipWithEmptyLog");
2050 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052 new FiniteDuration(1000, TimeUnit.SECONDS));
2053 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055 leader = new Leader(leaderActorContext);
2057 // Initial heartbeat
2058 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060 MessageCollectorActor.clearMessages(followerActor);
2062 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2063 leader.transferLeadership(mockTransferCohort);
2065 verify(mockTransferCohort, never()).transferComplete();
2066 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2067 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2069 // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2070 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2072 // Leader should force an election timeout
2073 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2075 verify(mockTransferCohort).transferComplete();
2079 public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2080 logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2082 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2083 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2084 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2086 leader = new Leader(leaderActorContext);
2088 // Initial heartbeat
2089 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2090 MessageCollectorActor.clearMessages(followerActor);
2092 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2093 leader.transferLeadership(mockTransferCohort);
2095 verify(mockTransferCohort, never()).transferComplete();
2097 // Sync up the follower.
2098 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100 MessageCollectorActor.clearMessages(followerActor);
2102 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2103 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2104 leader.handleMessage(leaderActor, new SendHeartBeat());
2105 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2108 // Leader should force an election timeout
2109 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2111 verify(mockTransferCohort).transferComplete();
2115 public void testTransferLeadershipWithFollowerSyncTimeout() {
2116 logStart("testTransferLeadershipWithFollowerSyncTimeout");
2118 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2119 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2121 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2122 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2124 leader = new Leader(leaderActorContext);
2126 // Initial heartbeat
2127 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129 MessageCollectorActor.clearMessages(followerActor);
2131 sendReplicate(leaderActorContext, 0);
2132 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134 MessageCollectorActor.clearMessages(followerActor);
2136 RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2137 leader.transferLeadership(mockTransferCohort);
2139 verify(mockTransferCohort, never()).transferComplete();
2141 // Send heartbeats to time out the transfer.
2142 for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2143 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145 leader.handleMessage(leaderActor, new SendHeartBeat());
2148 verify(mockTransferCohort).abortTransfer();
2149 verify(mockTransferCohort, never()).transferComplete();
2150 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2154 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2155 ActorRef actorRef, RaftRPC rpc) throws Exception {
2156 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2157 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2160 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2162 private final long electionTimeOutIntervalMillis;
2163 private final int snapshotChunkSize;
2165 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2167 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2168 this.snapshotChunkSize = snapshotChunkSize;
2172 public FiniteDuration getElectionTimeOutInterval() {
2173 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2177 public int getSnapshotChunkSize() {
2178 return snapshotChunkSize;