2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
55 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
56 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import scala.concurrent.duration.FiniteDuration;
60 public class LeaderTest extends AbstractLeaderTest {
62 static final String FOLLOWER_ID = "follower";
63 public static final String LEADER_ID = "leader";
65 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
66 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
68 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
71 private Leader leader;
72 private final short payloadVersion = 5;
76 public void tearDown() throws Exception {
85 public void testHandleMessageForUnknownMessage() throws Exception {
86 logStart("testHandleMessageForUnknownMessage");
88 leader = new Leader(createActorContext());
90 // handle message should return the Leader state when it receives an
92 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
93 Assert.assertTrue(behavior instanceof Leader);
97 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
98 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
100 MockRaftActorContext actorContext = createActorContextWithFollower();
101 short payloadVersion = (short)5;
102 actorContext.setPayloadVersion(payloadVersion);
105 actorContext.getTermInformation().update(term, "");
107 leader = new Leader(actorContext);
109 // Leader should send an immediate heartbeat with no entries as follower is inactive.
110 long lastIndex = actorContext.getReplicatedLog().lastIndex();
111 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
112 assertEquals("getTerm", term, appendEntries.getTerm());
113 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
114 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
115 assertEquals("Entries size", 0, appendEntries.getEntries().size());
116 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
118 // The follower would normally reply - simulate that explicitly here.
119 leader.handleMessage(followerActor, new AppendEntriesReply(
120 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
121 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
123 followerActor.underlyingActor().clear();
125 // Sleep for the heartbeat interval so AppendEntries is sent.
126 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
127 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
129 leader.handleMessage(leaderActor, new SendHeartBeat());
131 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
132 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
133 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
134 assertEquals("Entries size", 1, appendEntries.getEntries().size());
135 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
136 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
141 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
142 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
143 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
145 actorContext.getReplicatedLog().append(newEntry);
146 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
150 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
151 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
153 MockRaftActorContext actorContext = createActorContextWithFollower();
156 actorContext.getTermInformation().update(term, "");
158 leader = new Leader(actorContext);
160 // Leader will send an immediate heartbeat - ignore it.
161 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163 // The follower would normally reply - simulate that explicitly here.
164 long lastIndex = actorContext.getReplicatedLog().lastIndex();
165 leader.handleMessage(followerActor, new AppendEntriesReply(
166 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
167 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
169 followerActor.underlyingActor().clear();
171 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
173 // State should not change
174 assertTrue(raftBehavior instanceof Leader);
176 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
178 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
179 assertEquals("Entries size", 1, appendEntries.getEntries().size());
180 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
181 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
182 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
183 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
187 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
188 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
190 MockRaftActorContext actorContext = createActorContextWithFollower();
191 actorContext.setRaftPolicy(createRaftPolicy(true, true));
194 actorContext.getTermInformation().update(term, "");
196 leader = new Leader(actorContext);
198 // Leader will send an immediate heartbeat - ignore it.
199 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
201 // The follower would normally reply - simulate that explicitly here.
202 long lastIndex = actorContext.getReplicatedLog().lastIndex();
203 leader.handleMessage(followerActor, new AppendEntriesReply(
204 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
205 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
207 followerActor.underlyingActor().clear();
209 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
211 // State should not change
212 assertTrue(raftBehavior instanceof Leader);
214 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
216 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
217 assertEquals("Entries size", 1, appendEntries.getEntries().size());
218 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
219 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
220 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
221 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
225 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
226 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
228 MockRaftActorContext actorContext = createActorContextWithFollower();
229 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
231 public FiniteDuration getHeartBeatInterval() {
232 return FiniteDuration.apply(5, TimeUnit.SECONDS);
237 actorContext.getTermInformation().update(term, "");
239 leader = new Leader(actorContext);
241 // Leader will send an immediate heartbeat - ignore it.
242 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
244 // The follower would normally reply - simulate that explicitly here.
245 long lastIndex = actorContext.getReplicatedLog().lastIndex();
246 leader.handleMessage(followerActor, new AppendEntriesReply(
247 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
248 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
250 followerActor.underlyingActor().clear();
252 for(int i=0;i<5;i++) {
253 sendReplicate(actorContext, lastIndex+i+1);
256 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257 // We expect only 1 message to be sent because of two reasons,
258 // - an append entries reply was not received
259 // - the heartbeat interval has not expired
260 // In this scenario if multiple messages are sent they would likely be duplicates
261 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
265 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
266 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
268 MockRaftActorContext actorContext = createActorContextWithFollower();
269 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
271 public FiniteDuration getHeartBeatInterval() {
272 return FiniteDuration.apply(5, TimeUnit.SECONDS);
277 actorContext.getTermInformation().update(term, "");
279 leader = new Leader(actorContext);
281 // Leader will send an immediate heartbeat - ignore it.
282 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284 // The follower would normally reply - simulate that explicitly here.
285 long lastIndex = actorContext.getReplicatedLog().lastIndex();
286 leader.handleMessage(followerActor, new AppendEntriesReply(
287 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290 followerActor.underlyingActor().clear();
292 for(int i=0;i<3;i++) {
293 sendReplicate(actorContext, lastIndex+i+1);
294 leader.handleMessage(followerActor, new AppendEntriesReply(
295 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
299 for(int i=3;i<5;i++) {
300 sendReplicate(actorContext, lastIndex + i + 1);
303 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
304 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
305 // get sent to the follower - but not the 5th
306 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
308 for(int i=0;i<4;i++) {
309 long expected = allMessages.get(i).getEntries().get(0).getIndex();
310 assertEquals(expected, i+2);
315 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
316 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
318 MockRaftActorContext actorContext = createActorContextWithFollower();
319 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
321 public FiniteDuration getHeartBeatInterval() {
322 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
327 actorContext.getTermInformation().update(term, "");
329 leader = new Leader(actorContext);
331 // Leader will send an immediate heartbeat - ignore it.
332 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
334 // The follower would normally reply - simulate that explicitly here.
335 long lastIndex = actorContext.getReplicatedLog().lastIndex();
336 leader.handleMessage(followerActor, new AppendEntriesReply(
337 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
338 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
340 followerActor.underlyingActor().clear();
342 sendReplicate(actorContext, lastIndex+1);
344 // Wait slightly longer than heartbeat duration
345 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
347 leader.handleMessage(leaderActor, new SendHeartBeat());
349 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
350 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
352 assertEquals(1, allMessages.get(0).getEntries().size());
353 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
354 assertEquals(1, allMessages.get(1).getEntries().size());
355 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
360 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
361 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
363 MockRaftActorContext actorContext = createActorContextWithFollower();
364 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
366 public FiniteDuration getHeartBeatInterval() {
367 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
372 actorContext.getTermInformation().update(term, "");
374 leader = new Leader(actorContext);
376 // Leader will send an immediate heartbeat - ignore it.
377 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
379 // The follower would normally reply - simulate that explicitly here.
380 long lastIndex = actorContext.getReplicatedLog().lastIndex();
381 leader.handleMessage(followerActor, new AppendEntriesReply(
382 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
383 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
385 followerActor.underlyingActor().clear();
387 for(int i=0;i<3;i++) {
388 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
389 leader.handleMessage(leaderActor, new SendHeartBeat());
392 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
397 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
398 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
400 MockRaftActorContext actorContext = createActorContextWithFollower();
401 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
403 public FiniteDuration getHeartBeatInterval() {
404 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
409 actorContext.getTermInformation().update(term, "");
411 leader = new Leader(actorContext);
413 // Leader will send an immediate heartbeat - ignore it.
414 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
416 // The follower would normally reply - simulate that explicitly here.
417 long lastIndex = actorContext.getReplicatedLog().lastIndex();
418 leader.handleMessage(followerActor, new AppendEntriesReply(
419 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
422 followerActor.underlyingActor().clear();
424 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
425 leader.handleMessage(leaderActor, new SendHeartBeat());
426 sendReplicate(actorContext, lastIndex+1);
428 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
429 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
431 assertEquals(0, allMessages.get(0).getEntries().size());
432 assertEquals(1, allMessages.get(1).getEntries().size());
437 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
438 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
440 MockRaftActorContext actorContext = createActorContext();
442 leader = new Leader(actorContext);
444 actorContext.setLastApplied(0);
446 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
447 long term = actorContext.getTermInformation().getCurrentTerm();
448 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
449 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
451 actorContext.getReplicatedLog().append(newEntry);
453 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
454 new Replicate(leaderActor, "state-id", newEntry));
456 // State should not change
457 assertTrue(raftBehavior instanceof Leader);
459 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
461 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
462 // one since lastApplied state is 0.
463 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
464 leaderActor, ApplyState.class);
465 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
467 for(int i = 0; i <= newLogIndex - 1; i++ ) {
468 ApplyState applyState = applyStateList.get(i);
469 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
470 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
473 ApplyState last = applyStateList.get((int) newLogIndex - 1);
474 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
475 assertEquals("getIdentifier", "state-id", last.getIdentifier());
479 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
480 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
482 MockRaftActorContext actorContext = createActorContextWithFollower();
484 Map<String, String> leadersSnapshot = new HashMap<>();
485 leadersSnapshot.put("1", "A");
486 leadersSnapshot.put("2", "B");
487 leadersSnapshot.put("3", "C");
490 actorContext.getReplicatedLog().removeFrom(0);
492 final int commitIndex = 3;
493 final int snapshotIndex = 2;
494 final int newEntryIndex = 4;
495 final int snapshotTerm = 1;
496 final int currentTerm = 2;
498 // set the snapshot variables in replicatedlog
499 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
500 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
501 actorContext.setCommitIndex(commitIndex);
502 //set follower timeout to 2 mins, helps during debugging
503 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
505 leader = new Leader(actorContext);
507 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
508 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
511 ReplicatedLogImplEntry entry =
512 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
513 new MockRaftActorContext.MockPayload("D"));
515 //update follower timestamp
516 leader.markFollowerActive(FOLLOWER_ID);
518 ByteString bs = toByteString(leadersSnapshot);
519 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
520 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
521 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
522 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
524 //send first chunk and no InstallSnapshotReply received yet
526 fts.incrementChunkIndex();
528 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
529 TimeUnit.MILLISECONDS);
531 leader.handleMessage(leaderActor, new SendHeartBeat());
533 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
535 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
537 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
539 //InstallSnapshotReply received
540 fts.markSendStatus(true);
542 leader.handleMessage(leaderActor, new SendHeartBeat());
544 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
546 assertEquals(commitIndex, is.getLastIncludedIndex());
550 public void testSendAppendEntriesSnapshotScenario() throws Exception {
551 logStart("testSendAppendEntriesSnapshotScenario");
553 MockRaftActorContext actorContext = createActorContextWithFollower();
555 Map<String, String> leadersSnapshot = new HashMap<>();
556 leadersSnapshot.put("1", "A");
557 leadersSnapshot.put("2", "B");
558 leadersSnapshot.put("3", "C");
561 actorContext.getReplicatedLog().removeFrom(0);
563 final int followersLastIndex = 2;
564 final int snapshotIndex = 3;
565 final int newEntryIndex = 4;
566 final int snapshotTerm = 1;
567 final int currentTerm = 2;
569 // set the snapshot variables in replicatedlog
570 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572 actorContext.setCommitIndex(followersLastIndex);
574 leader = new Leader(actorContext);
576 // Leader will send an immediate heartbeat - ignore it.
577 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
580 ReplicatedLogImplEntry entry =
581 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582 new MockRaftActorContext.MockPayload("D"));
584 actorContext.getReplicatedLog().append(entry);
586 //update follower timestamp
587 leader.markFollowerActive(FOLLOWER_ID);
589 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
590 RaftActorBehavior raftBehavior = leader.handleMessage(
591 leaderActor, new Replicate(null, "state-id", entry));
593 assertTrue(raftBehavior instanceof Leader);
595 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
599 public void testInitiateInstallSnapshot() throws Exception {
600 logStart("testInitiateInstallSnapshot");
602 MockRaftActorContext actorContext = createActorContextWithFollower();
605 actorContext.getReplicatedLog().removeFrom(0);
607 final int followersLastIndex = 2;
608 final int snapshotIndex = 3;
609 final int newEntryIndex = 4;
610 final int snapshotTerm = 1;
611 final int currentTerm = 2;
613 // set the snapshot variables in replicatedlog
614 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
615 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
616 actorContext.setLastApplied(3);
617 actorContext.setCommitIndex(followersLastIndex);
619 leader = new Leader(actorContext);
621 // Leader will send an immediate heartbeat - ignore it.
622 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
624 // set the snapshot as absent and check if capture-snapshot is invoked.
625 leader.setSnapshot(null);
628 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
629 new MockRaftActorContext.MockPayload("D"));
631 actorContext.getReplicatedLog().append(entry);
633 //update follower timestamp
634 leader.markFollowerActive(FOLLOWER_ID);
636 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
638 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
640 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
642 assertTrue(cs.isInstallSnapshotInitiated());
643 assertEquals(3, cs.getLastAppliedIndex());
644 assertEquals(1, cs.getLastAppliedTerm());
645 assertEquals(4, cs.getLastIndex());
646 assertEquals(2, cs.getLastTerm());
648 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
649 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
651 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
655 public void testInitiateForceInstallSnapshot() throws Exception {
656 logStart("testInitiateForceInstallSnapshot");
658 MockRaftActorContext actorContext = createActorContextWithFollower();
660 final int followersLastIndex = 2;
661 final int snapshotIndex = -1;
662 final int newEntryIndex = 4;
663 final int snapshotTerm = -1;
664 final int currentTerm = 2;
666 // set the snapshot variables in replicatedlog
667 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
668 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
669 actorContext.setLastApplied(3);
670 actorContext.setCommitIndex(followersLastIndex);
672 actorContext.getReplicatedLog().removeFrom(0);
674 leader = new Leader(actorContext);
676 // Leader will send an immediate heartbeat - ignore it.
677 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
679 // set the snapshot as absent and check if capture-snapshot is invoked.
680 leader.setSnapshot(null);
682 for(int i=0;i<4;i++) {
683 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
684 new MockRaftActorContext.MockPayload("X" + i)));
688 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
689 new MockRaftActorContext.MockPayload("D"));
691 actorContext.getReplicatedLog().append(entry);
693 //update follower timestamp
694 leader.markFollowerActive(FOLLOWER_ID);
696 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
697 // installed with a SendInstallSnapshot
698 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
700 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
702 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
704 assertTrue(cs.isInstallSnapshotInitiated());
705 assertEquals(3, cs.getLastAppliedIndex());
706 assertEquals(1, cs.getLastAppliedTerm());
707 assertEquals(4, cs.getLastIndex());
708 assertEquals(2, cs.getLastTerm());
710 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
711 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
713 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
718 public void testInstallSnapshot() throws Exception {
719 logStart("testInstallSnapshot");
721 MockRaftActorContext actorContext = createActorContextWithFollower();
723 Map<String, String> leadersSnapshot = new HashMap<>();
724 leadersSnapshot.put("1", "A");
725 leadersSnapshot.put("2", "B");
726 leadersSnapshot.put("3", "C");
729 actorContext.getReplicatedLog().removeFrom(0);
731 final int lastAppliedIndex = 3;
732 final int snapshotIndex = 2;
733 final int snapshotTerm = 1;
734 final int currentTerm = 2;
736 // set the snapshot variables in replicatedlog
737 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
738 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
739 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740 actorContext.setCommitIndex(lastAppliedIndex);
741 actorContext.setLastApplied(lastAppliedIndex);
743 leader = new Leader(actorContext);
745 // Initial heartbeat.
746 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
748 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
749 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
751 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
752 Collections.<ReplicatedLogEntry>emptyList(),
753 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
755 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
757 assertTrue(raftBehavior instanceof Leader);
759 // check if installsnapshot gets called with the correct values.
761 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
763 assertNotNull(installSnapshot.getData());
764 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
765 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
767 assertEquals(currentTerm, installSnapshot.getTerm());
771 public void testForceInstallSnapshot() throws Exception {
772 logStart("testForceInstallSnapshot");
774 MockRaftActorContext actorContext = createActorContextWithFollower();
776 Map<String, String> leadersSnapshot = new HashMap<>();
777 leadersSnapshot.put("1", "A");
778 leadersSnapshot.put("2", "B");
779 leadersSnapshot.put("3", "C");
781 final int lastAppliedIndex = 3;
782 final int snapshotIndex = -1;
783 final int snapshotTerm = -1;
784 final int currentTerm = 2;
786 // set the snapshot variables in replicatedlog
787 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
788 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
789 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
790 actorContext.setCommitIndex(lastAppliedIndex);
791 actorContext.setLastApplied(lastAppliedIndex);
793 leader = new Leader(actorContext);
795 // Initial heartbeat.
796 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
798 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
799 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
801 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
802 Collections.<ReplicatedLogEntry>emptyList(),
803 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
805 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
807 assertTrue(raftBehavior instanceof Leader);
809 // check if installsnapshot gets called with the correct values.
811 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
813 assertNotNull(installSnapshot.getData());
814 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
815 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
817 assertEquals(currentTerm, installSnapshot.getTerm());
821 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
822 logStart("testHandleInstallSnapshotReplyLastChunk");
824 MockRaftActorContext actorContext = createActorContextWithFollower();
826 final int commitIndex = 3;
827 final int snapshotIndex = 2;
828 final int snapshotTerm = 1;
829 final int currentTerm = 2;
831 actorContext.setCommitIndex(commitIndex);
833 leader = new Leader(actorContext);
835 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
836 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
838 // Ignore initial heartbeat.
839 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
841 Map<String, String> leadersSnapshot = new HashMap<>();
842 leadersSnapshot.put("1", "A");
843 leadersSnapshot.put("2", "B");
844 leadersSnapshot.put("3", "C");
846 // set the snapshot variables in replicatedlog
848 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
849 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
850 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
852 ByteString bs = toByteString(leadersSnapshot);
853 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
854 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
855 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
856 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
857 while(!fts.isLastChunk(fts.getChunkIndex())) {
859 fts.incrementChunkIndex();
863 actorContext.getReplicatedLog().removeFrom(0);
865 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
866 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
868 assertTrue(raftBehavior instanceof Leader);
870 assertEquals(0, leader.followerSnapshotSize());
871 assertEquals(1, leader.followerLogSize());
872 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
874 assertEquals(commitIndex, fli.getMatchIndex());
875 assertEquals(commitIndex + 1, fli.getNextIndex());
879 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
880 logStart("testSendSnapshotfromInstallSnapshotReply");
882 MockRaftActorContext actorContext = createActorContextWithFollower();
884 final int commitIndex = 3;
885 final int snapshotIndex = 2;
886 final int snapshotTerm = 1;
887 final int currentTerm = 2;
889 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
891 public int getSnapshotChunkSize() {
895 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
896 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
898 actorContext.setConfigParams(configParams);
899 actorContext.setCommitIndex(commitIndex);
901 leader = new Leader(actorContext);
903 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
904 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
906 Map<String, String> leadersSnapshot = new HashMap<>();
907 leadersSnapshot.put("1", "A");
908 leadersSnapshot.put("2", "B");
909 leadersSnapshot.put("3", "C");
911 // set the snapshot variables in replicatedlog
912 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
913 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
914 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916 ByteString bs = toByteString(leadersSnapshot);
917 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
918 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
919 leader.setSnapshot(snapshot);
921 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
923 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
925 assertEquals(1, installSnapshot.getChunkIndex());
926 assertEquals(3, installSnapshot.getTotalChunks());
928 followerActor.underlyingActor().clear();
929 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
930 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
932 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
934 assertEquals(2, installSnapshot.getChunkIndex());
935 assertEquals(3, installSnapshot.getTotalChunks());
937 followerActor.underlyingActor().clear();
938 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
939 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
941 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
943 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
944 followerActor.underlyingActor().clear();
945 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
946 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
948 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
950 Assert.assertNull(installSnapshot);
955 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
956 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
958 MockRaftActorContext actorContext = createActorContextWithFollower();
960 final int commitIndex = 3;
961 final int snapshotIndex = 2;
962 final int snapshotTerm = 1;
963 final int currentTerm = 2;
965 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
967 public int getSnapshotChunkSize() {
972 actorContext.setCommitIndex(commitIndex);
974 leader = new Leader(actorContext);
976 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
977 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
979 Map<String, String> leadersSnapshot = new HashMap<>();
980 leadersSnapshot.put("1", "A");
981 leadersSnapshot.put("2", "B");
982 leadersSnapshot.put("3", "C");
984 // set the snapshot variables in replicatedlog
985 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
986 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
987 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
989 ByteString bs = toByteString(leadersSnapshot);
990 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
991 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
992 leader.setSnapshot(snapshot);
994 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
995 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
997 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
999 assertEquals(1, installSnapshot.getChunkIndex());
1000 assertEquals(3, installSnapshot.getTotalChunks());
1002 followerActor.underlyingActor().clear();
1004 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005 FOLLOWER_ID, -1, false));
1007 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1008 TimeUnit.MILLISECONDS);
1010 leader.handleMessage(leaderActor, new SendHeartBeat());
1012 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1014 assertEquals(1, installSnapshot.getChunkIndex());
1015 assertEquals(3, installSnapshot.getTotalChunks());
1019 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1020 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1022 MockRaftActorContext actorContext = createActorContextWithFollower();
1024 final int commitIndex = 3;
1025 final int snapshotIndex = 2;
1026 final int snapshotTerm = 1;
1027 final int currentTerm = 2;
1029 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1031 public int getSnapshotChunkSize() {
1036 actorContext.setCommitIndex(commitIndex);
1038 leader = new Leader(actorContext);
1040 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1041 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1043 Map<String, String> leadersSnapshot = new HashMap<>();
1044 leadersSnapshot.put("1", "A");
1045 leadersSnapshot.put("2", "B");
1046 leadersSnapshot.put("3", "C");
1048 // set the snapshot variables in replicatedlog
1049 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1050 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1051 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1053 ByteString bs = toByteString(leadersSnapshot);
1054 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1055 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1056 leader.setSnapshot(snapshot);
1058 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1060 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1062 assertEquals(1, installSnapshot.getChunkIndex());
1063 assertEquals(3, installSnapshot.getTotalChunks());
1064 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1066 int hashCode = installSnapshot.getData().hashCode();
1068 followerActor.underlyingActor().clear();
1070 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1071 FOLLOWER_ID, 1, true));
1073 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1075 assertEquals(2, installSnapshot.getChunkIndex());
1076 assertEquals(3, installSnapshot.getTotalChunks());
1077 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1081 public void testFollowerToSnapshotLogic() {
1082 logStart("testFollowerToSnapshotLogic");
1084 MockRaftActorContext actorContext = createActorContext();
1086 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1088 public int getSnapshotChunkSize() {
1093 leader = new Leader(actorContext);
1095 Map<String, String> leadersSnapshot = new HashMap<>();
1096 leadersSnapshot.put("1", "A");
1097 leadersSnapshot.put("2", "B");
1098 leadersSnapshot.put("3", "C");
1100 ByteString bs = toByteString(leadersSnapshot);
1101 byte[] barray = bs.toByteArray();
1103 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1104 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1106 assertEquals(bs.size(), barray.length);
1109 for (int i=0; i < barray.length; i = i + 50) {
1113 if (i + 50 > barray.length) {
1117 ByteString chunk = fts.getNextChunk();
1118 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1119 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1121 fts.markSendStatus(true);
1122 if (!fts.isLastChunk(chunkIndex)) {
1123 fts.incrementChunkIndex();
1127 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1130 @Override protected RaftActorBehavior createBehavior(
1131 RaftActorContext actorContext) {
1132 return new Leader(actorContext);
1136 protected MockRaftActorContext createActorContext() {
1137 return createActorContext(leaderActor);
1141 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1142 return createActorContext(LEADER_ID, actorRef);
1145 private MockRaftActorContext createActorContextWithFollower() {
1146 MockRaftActorContext actorContext = createActorContext();
1147 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1148 followerActor.path().toString()).build());
1149 return actorContext;
1152 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1153 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1154 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1155 configParams.setElectionTimeoutFactor(100000);
1156 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1157 context.setConfigParams(configParams);
1158 context.setPayloadVersion(payloadVersion);
1162 private MockRaftActorContext createFollowerActorContextWithLeader() {
1163 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1164 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1165 followerConfig.setElectionTimeoutFactor(10000);
1166 followerActorContext.setConfigParams(followerConfig);
1167 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1168 return followerActorContext;
1172 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1173 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1175 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1177 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1179 Follower follower = new Follower(followerActorContext);
1180 followerActor.underlyingActor().setBehavior(follower);
1182 Map<String, String> peerAddresses = new HashMap<>();
1183 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1185 leaderActorContext.setPeerAddresses(peerAddresses);
1187 leaderActorContext.getReplicatedLog().removeFrom(0);
1190 leaderActorContext.setReplicatedLog(
1191 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1193 leaderActorContext.setCommitIndex(1);
1195 followerActorContext.getReplicatedLog().removeFrom(0);
1197 // follower too has the exact same log entries and has the same commit index
1198 followerActorContext.setReplicatedLog(
1199 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1201 followerActorContext.setCommitIndex(1);
1203 leader = new Leader(leaderActorContext);
1205 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1207 assertEquals(1, appendEntries.getLeaderCommit());
1208 assertEquals(0, appendEntries.getEntries().size());
1209 assertEquals(0, appendEntries.getPrevLogIndex());
1211 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1212 leaderActor, AppendEntriesReply.class);
1214 assertEquals(2, appendEntriesReply.getLogLastIndex());
1215 assertEquals(1, appendEntriesReply.getLogLastTerm());
1217 // follower returns its next index
1218 assertEquals(2, appendEntriesReply.getLogLastIndex());
1219 assertEquals(1, appendEntriesReply.getLogLastTerm());
1225 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1226 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1228 MockRaftActorContext leaderActorContext = createActorContext();
1230 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1233 Follower follower = new Follower(followerActorContext);
1234 followerActor.underlyingActor().setBehavior(follower);
1236 Map<String, String> leaderPeerAddresses = new HashMap<>();
1237 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1239 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1241 leaderActorContext.getReplicatedLog().removeFrom(0);
1243 leaderActorContext.setReplicatedLog(
1244 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1246 leaderActorContext.setCommitIndex(1);
1248 followerActorContext.getReplicatedLog().removeFrom(0);
1250 followerActorContext.setReplicatedLog(
1251 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1253 // follower has the same log entries but its commit index > leaders commit index
1254 followerActorContext.setCommitIndex(2);
1256 leader = new Leader(leaderActorContext);
1258 // Initial heartbeat
1259 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1261 assertEquals(1, appendEntries.getLeaderCommit());
1262 assertEquals(0, appendEntries.getEntries().size());
1263 assertEquals(0, appendEntries.getPrevLogIndex());
1265 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1266 leaderActor, AppendEntriesReply.class);
1268 assertEquals(2, appendEntriesReply.getLogLastIndex());
1269 assertEquals(1, appendEntriesReply.getLogLastTerm());
1271 leaderActor.underlyingActor().setBehavior(follower);
1272 leader.handleMessage(followerActor, appendEntriesReply);
1274 leaderActor.underlyingActor().clear();
1275 followerActor.underlyingActor().clear();
1277 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1278 TimeUnit.MILLISECONDS);
1280 leader.handleMessage(leaderActor, new SendHeartBeat());
1282 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1284 assertEquals(2, appendEntries.getLeaderCommit());
1285 assertEquals(0, appendEntries.getEntries().size());
1286 assertEquals(2, appendEntries.getPrevLogIndex());
1288 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1290 assertEquals(2, appendEntriesReply.getLogLastIndex());
1291 assertEquals(1, appendEntriesReply.getLogLastTerm());
1293 assertEquals(2, followerActorContext.getCommitIndex());
1299 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1300 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1302 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1303 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1304 new FiniteDuration(1000, TimeUnit.SECONDS));
1306 leaderActorContext.setReplicatedLog(
1307 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308 long leaderCommitIndex = 2;
1309 leaderActorContext.setCommitIndex(leaderCommitIndex);
1310 leaderActorContext.setLastApplied(leaderCommitIndex);
1312 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1313 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1315 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1317 followerActorContext.setReplicatedLog(
1318 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1319 followerActorContext.setCommitIndex(0);
1320 followerActorContext.setLastApplied(0);
1322 Follower follower = new Follower(followerActorContext);
1323 followerActor.underlyingActor().setBehavior(follower);
1325 leader = new Leader(leaderActorContext);
1327 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1330 MessageCollectorActor.clearMessages(followerActor);
1331 MessageCollectorActor.clearMessages(leaderActor);
1333 // Verify initial AppendEntries sent with the leader's current commit index.
1334 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1335 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1336 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1338 leaderActor.underlyingActor().setBehavior(leader);
1340 leader.handleMessage(followerActor, appendEntriesReply);
1342 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1343 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1345 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1346 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1347 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1349 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1350 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1351 appendEntries.getEntries().get(0).getData());
1352 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1353 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1354 appendEntries.getEntries().get(1).getData());
1356 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1357 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1359 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1361 ApplyState applyState = applyStateList.get(0);
1362 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1363 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1364 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1365 applyState.getReplicatedLogEntry().getData());
1367 applyState = applyStateList.get(1);
1368 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1369 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1370 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1371 applyState.getReplicatedLogEntry().getData());
1373 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1374 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1378 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1379 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1381 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1382 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1383 new FiniteDuration(1000, TimeUnit.SECONDS));
1385 leaderActorContext.setReplicatedLog(
1386 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1387 long leaderCommitIndex = 1;
1388 leaderActorContext.setCommitIndex(leaderCommitIndex);
1389 leaderActorContext.setLastApplied(leaderCommitIndex);
1391 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1392 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1394 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1396 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1397 followerActorContext.setCommitIndex(-1);
1398 followerActorContext.setLastApplied(-1);
1400 Follower follower = new Follower(followerActorContext);
1401 followerActor.underlyingActor().setBehavior(follower);
1403 leader = new Leader(leaderActorContext);
1405 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1406 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1408 MessageCollectorActor.clearMessages(followerActor);
1409 MessageCollectorActor.clearMessages(leaderActor);
1411 // Verify initial AppendEntries sent with the leader's current commit index.
1412 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1413 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1414 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1416 leaderActor.underlyingActor().setBehavior(leader);
1418 leader.handleMessage(followerActor, appendEntriesReply);
1420 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1421 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1423 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1424 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1425 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1427 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1428 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1429 appendEntries.getEntries().get(0).getData());
1430 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1431 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1432 appendEntries.getEntries().get(1).getData());
1434 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1435 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1437 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1439 ApplyState applyState = applyStateList.get(0);
1440 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1441 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1442 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1443 applyState.getReplicatedLogEntry().getData());
1445 applyState = applyStateList.get(1);
1446 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1447 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1448 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1449 applyState.getReplicatedLogEntry().getData());
1451 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1452 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1456 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1457 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1459 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1460 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1461 new FiniteDuration(1000, TimeUnit.SECONDS));
1463 leaderActorContext.setReplicatedLog(
1464 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1465 long leaderCommitIndex = 1;
1466 leaderActorContext.setCommitIndex(leaderCommitIndex);
1467 leaderActorContext.setLastApplied(leaderCommitIndex);
1469 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1470 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1472 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1474 followerActorContext.setReplicatedLog(
1475 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1476 followerActorContext.setCommitIndex(-1);
1477 followerActorContext.setLastApplied(-1);
1479 Follower follower = new Follower(followerActorContext);
1480 followerActor.underlyingActor().setBehavior(follower);
1482 leader = new Leader(leaderActorContext);
1484 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1485 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1487 MessageCollectorActor.clearMessages(followerActor);
1488 MessageCollectorActor.clearMessages(leaderActor);
1490 // Verify initial AppendEntries sent with the leader's current commit index.
1491 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1492 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1493 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1495 leaderActor.underlyingActor().setBehavior(leader);
1497 leader.handleMessage(followerActor, appendEntriesReply);
1499 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1500 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1502 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1503 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1504 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1506 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1507 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1508 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1509 appendEntries.getEntries().get(0).getData());
1510 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1511 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1512 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1513 appendEntries.getEntries().get(1).getData());
1515 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1516 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1518 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1520 ApplyState applyState = applyStateList.get(0);
1521 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1522 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1523 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1524 applyState.getReplicatedLogEntry().getData());
1526 applyState = applyStateList.get(1);
1527 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1528 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1529 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1530 applyState.getReplicatedLogEntry().getData());
1532 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1533 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1534 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1538 public void testHandleAppendEntriesReplyWithNewerTerm(){
1539 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1541 MockRaftActorContext leaderActorContext = createActorContext();
1542 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1543 new FiniteDuration(10000, TimeUnit.SECONDS));
1545 leaderActorContext.setReplicatedLog(
1546 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1548 leader = new Leader(leaderActorContext);
1549 leaderActor.underlyingActor().setBehavior(leader);
1550 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1552 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1554 assertEquals(false, appendEntriesReply.isSuccess());
1555 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1557 MessageCollectorActor.clearMessages(leaderActor);
1561 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1562 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1564 MockRaftActorContext leaderActorContext = createActorContext();
1565 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1566 new FiniteDuration(10000, TimeUnit.SECONDS));
1568 leaderActorContext.setReplicatedLog(
1569 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1570 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1572 leader = new Leader(leaderActorContext);
1573 leaderActor.underlyingActor().setBehavior(leader);
1574 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1576 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1578 assertEquals(false, appendEntriesReply.isSuccess());
1579 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1581 MessageCollectorActor.clearMessages(leaderActor);
1585 public void testHandleAppendEntriesReplySuccess() throws Exception {
1586 logStart("testHandleAppendEntriesReplySuccess");
1588 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1590 leaderActorContext.setReplicatedLog(
1591 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1593 leaderActorContext.setCommitIndex(1);
1594 leaderActorContext.setLastApplied(1);
1595 leaderActorContext.getTermInformation().update(1, "leader");
1597 leader = new Leader(leaderActorContext);
1599 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1601 short payloadVersion = 5;
1602 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1604 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1606 assertEquals(RaftState.Leader, raftActorBehavior.state());
1608 assertEquals(2, leaderActorContext.getCommitIndex());
1610 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1611 leaderActor, ApplyJournalEntries.class);
1613 assertEquals(2, leaderActorContext.getLastApplied());
1615 assertEquals(2, applyJournalEntries.getToIndex());
1617 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1620 assertEquals(1,applyStateList.size());
1622 ApplyState applyState = applyStateList.get(0);
1624 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1626 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1627 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1631 public void testHandleAppendEntriesReplyUnknownFollower(){
1632 logStart("testHandleAppendEntriesReplyUnknownFollower");
1634 MockRaftActorContext leaderActorContext = createActorContext();
1636 leader = new Leader(leaderActorContext);
1638 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1640 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1642 assertEquals(RaftState.Leader, raftActorBehavior.state());
1646 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1647 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1649 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1650 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1651 new FiniteDuration(1000, TimeUnit.SECONDS));
1652 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1654 leaderActorContext.setReplicatedLog(
1655 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1656 long leaderCommitIndex = 3;
1657 leaderActorContext.setCommitIndex(leaderCommitIndex);
1658 leaderActorContext.setLastApplied(leaderCommitIndex);
1660 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1661 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1662 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1663 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1665 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1667 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1668 followerActorContext.setCommitIndex(-1);
1669 followerActorContext.setLastApplied(-1);
1671 Follower follower = new Follower(followerActorContext);
1672 followerActor.underlyingActor().setBehavior(follower);
1674 leader = new Leader(leaderActorContext);
1676 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1677 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1679 MessageCollectorActor.clearMessages(followerActor);
1680 MessageCollectorActor.clearMessages(leaderActor);
1682 // Verify initial AppendEntries sent with the leader's current commit index.
1683 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1684 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1685 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1687 leaderActor.underlyingActor().setBehavior(leader);
1689 leader.handleMessage(followerActor, appendEntriesReply);
1691 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1692 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1694 appendEntries = appendEntriesList.get(0);
1695 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1696 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1697 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1699 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1700 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1701 appendEntries.getEntries().get(0).getData());
1702 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1703 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1704 appendEntries.getEntries().get(1).getData());
1706 appendEntries = appendEntriesList.get(1);
1707 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1708 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1709 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1711 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1712 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1713 appendEntries.getEntries().get(0).getData());
1714 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1715 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1716 appendEntries.getEntries().get(1).getData());
1718 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1719 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1721 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1723 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1724 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1728 public void testHandleRequestVoteReply(){
1729 logStart("testHandleRequestVoteReply");
1731 MockRaftActorContext leaderActorContext = createActorContext();
1733 leader = new Leader(leaderActorContext);
1735 // Should be a no-op.
1736 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1737 new RequestVoteReply(1, true));
1739 assertEquals(RaftState.Leader, raftActorBehavior.state());
1741 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1743 assertEquals(RaftState.Leader, raftActorBehavior.state());
1747 public void testIsolatedLeaderCheckNoFollowers() {
1748 logStart("testIsolatedLeaderCheckNoFollowers");
1750 MockRaftActorContext leaderActorContext = createActorContext();
1752 leader = new Leader(leaderActorContext);
1753 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1754 Assert.assertTrue(behavior instanceof Leader);
1757 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1758 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1759 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1761 MockRaftActorContext leaderActorContext = createActorContext();
1763 Map<String, String> peerAddresses = new HashMap<>();
1764 peerAddresses.put("follower-1", followerActor1.path().toString());
1765 peerAddresses.put("follower-2", followerActor2.path().toString());
1767 leaderActorContext.setPeerAddresses(peerAddresses);
1768 leaderActorContext.setRaftPolicy(raftPolicy);
1770 leader = new Leader(leaderActorContext);
1772 leader.markFollowerActive("follower-1");
1773 leader.markFollowerActive("follower-2");
1774 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1775 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1776 behavior instanceof Leader);
1778 // kill 1 follower and verify if that got killed
1779 final JavaTestKit probe = new JavaTestKit(getSystem());
1780 probe.watch(followerActor1);
1781 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1782 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1783 assertEquals(termMsg1.getActor(), followerActor1);
1785 leader.markFollowerInActive("follower-1");
1786 leader.markFollowerActive("follower-2");
1787 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1788 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1789 behavior instanceof Leader);
1791 // kill 2nd follower and leader should change to Isolated leader
1792 followerActor2.tell(PoisonPill.getInstance(), null);
1793 probe.watch(followerActor2);
1794 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1795 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1796 assertEquals(termMsg2.getActor(), followerActor2);
1798 leader.markFollowerInActive("follower-2");
1799 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1803 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1804 logStart("testIsolatedLeaderCheckTwoFollowers");
1806 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1808 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1809 behavior instanceof IsolatedLeader);
1813 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1814 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1816 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1818 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1819 behavior instanceof Leader);
1823 public void testLaggingFollowerStarvation() throws Exception {
1824 logStart("testLaggingFollowerStarvation");
1825 new JavaTestKit(getSystem()) {{
1826 String leaderActorId = actorFactory.generateActorId("leader");
1827 String follower1ActorId = actorFactory.generateActorId("follower");
1828 String follower2ActorId = actorFactory.generateActorId("follower");
1830 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1831 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1832 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1833 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1835 MockRaftActorContext leaderActorContext =
1836 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1838 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1839 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1840 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1842 leaderActorContext.setConfigParams(configParams);
1844 leaderActorContext.setReplicatedLog(
1845 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1847 Map<String, String> peerAddresses = new HashMap<>();
1848 peerAddresses.put(follower1ActorId,
1849 follower1Actor.path().toString());
1850 peerAddresses.put(follower2ActorId,
1851 follower2Actor.path().toString());
1853 leaderActorContext.setPeerAddresses(peerAddresses);
1854 leaderActorContext.getTermInformation().update(1, leaderActorId);
1856 RaftActorBehavior leader = createBehavior(leaderActorContext);
1858 leaderActor.underlyingActor().setBehavior(leader);
1860 for(int i=1;i<6;i++) {
1861 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1862 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1863 assertTrue(newBehavior == leader);
1864 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1867 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1868 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1870 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1871 heartbeats.size() > 1);
1873 // Check if follower-2 got AppendEntries during this time and was not starved
1874 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1876 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1877 appendEntries.size() > 1);
1883 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1884 ActorRef actorRef, RaftRPC rpc) throws Exception {
1885 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1886 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1889 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1891 private final long electionTimeOutIntervalMillis;
1892 private final int snapshotChunkSize;
1894 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1896 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1897 this.snapshotChunkSize = snapshotChunkSize;
1901 public FiniteDuration getElectionTimeOutInterval() {
1902 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1906 public int getSnapshotChunkSize() {
1907 return snapshotChunkSize;