2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.VotingState;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
43 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
45 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
47 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
54 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
55 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
56 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
57 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import scala.concurrent.duration.FiniteDuration;
61 public class LeaderTest extends AbstractLeaderTest {
63 static final String FOLLOWER_ID = "follower";
64 public static final String LEADER_ID = "leader";
66 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
67 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
69 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
70 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
72 private Leader leader;
73 private final short payloadVersion = 5;
77 public void tearDown() throws Exception {
86 public void testHandleMessageForUnknownMessage() throws Exception {
87 logStart("testHandleMessageForUnknownMessage");
89 leader = new Leader(createActorContext());
91 // handle message should return the Leader state when it receives an
93 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
94 Assert.assertTrue(behavior instanceof Leader);
98 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
99 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
101 MockRaftActorContext actorContext = createActorContextWithFollower();
102 short payloadVersion = (short)5;
103 actorContext.setPayloadVersion(payloadVersion);
106 actorContext.getTermInformation().update(term, "");
108 leader = new Leader(actorContext);
110 // Leader should send an immediate heartbeat with no entries as follower is inactive.
111 long lastIndex = actorContext.getReplicatedLog().lastIndex();
112 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
113 assertEquals("getTerm", term, appendEntries.getTerm());
114 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
115 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
116 assertEquals("Entries size", 0, appendEntries.getEntries().size());
117 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
119 // The follower would normally reply - simulate that explicitly here.
120 leader.handleMessage(followerActor, new AppendEntriesReply(
121 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
122 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
124 followerActor.underlyingActor().clear();
126 // Sleep for the heartbeat interval so AppendEntries is sent.
127 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
128 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
130 leader.handleMessage(leaderActor, new SendHeartBeat());
132 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
134 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
135 assertEquals("Entries size", 1, appendEntries.getEntries().size());
136 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
137 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
138 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
142 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
143 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
144 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
146 actorContext.getReplicatedLog().append(newEntry);
147 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
151 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
152 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
154 MockRaftActorContext actorContext = createActorContextWithFollower();
157 actorContext.getTermInformation().update(term, "");
159 leader = new Leader(actorContext);
161 // Leader will send an immediate heartbeat - ignore it.
162 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164 // The follower would normally reply - simulate that explicitly here.
165 long lastIndex = actorContext.getReplicatedLog().lastIndex();
166 leader.handleMessage(followerActor, new AppendEntriesReply(
167 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
168 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
170 followerActor.underlyingActor().clear();
172 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
174 // State should not change
175 assertTrue(raftBehavior instanceof Leader);
177 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
178 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
179 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
180 assertEquals("Entries size", 1, appendEntries.getEntries().size());
181 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
182 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
183 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
184 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
188 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
189 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
191 MockRaftActorContext actorContext = createActorContextWithFollower();
192 actorContext.setRaftPolicy(createRaftPolicy(true, true));
195 actorContext.getTermInformation().update(term, "");
197 leader = new Leader(actorContext);
199 // Leader will send an immediate heartbeat - ignore it.
200 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
202 // The follower would normally reply - simulate that explicitly here.
203 long lastIndex = actorContext.getReplicatedLog().lastIndex();
204 leader.handleMessage(followerActor, new AppendEntriesReply(
205 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
206 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
208 followerActor.underlyingActor().clear();
210 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
212 // State should not change
213 assertTrue(raftBehavior instanceof Leader);
215 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
216 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
217 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
218 assertEquals("Entries size", 1, appendEntries.getEntries().size());
219 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
220 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
221 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
222 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
226 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
227 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
229 MockRaftActorContext actorContext = createActorContextWithFollower();
230 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
232 public FiniteDuration getHeartBeatInterval() {
233 return FiniteDuration.apply(5, TimeUnit.SECONDS);
238 actorContext.getTermInformation().update(term, "");
240 leader = new Leader(actorContext);
242 // Leader will send an immediate heartbeat - ignore it.
243 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
245 // The follower would normally reply - simulate that explicitly here.
246 long lastIndex = actorContext.getReplicatedLog().lastIndex();
247 leader.handleMessage(followerActor, new AppendEntriesReply(
248 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
249 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
251 followerActor.underlyingActor().clear();
253 for(int i=0;i<5;i++) {
254 sendReplicate(actorContext, lastIndex+i+1);
257 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
258 // We expect only 1 message to be sent because of two reasons,
259 // - an append entries reply was not received
260 // - the heartbeat interval has not expired
261 // In this scenario if multiple messages are sent they would likely be duplicates
262 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
266 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
267 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
269 MockRaftActorContext actorContext = createActorContextWithFollower();
270 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
272 public FiniteDuration getHeartBeatInterval() {
273 return FiniteDuration.apply(5, TimeUnit.SECONDS);
278 actorContext.getTermInformation().update(term, "");
280 leader = new Leader(actorContext);
282 // Leader will send an immediate heartbeat - ignore it.
283 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285 // The follower would normally reply - simulate that explicitly here.
286 long lastIndex = actorContext.getReplicatedLog().lastIndex();
287 leader.handleMessage(followerActor, new AppendEntriesReply(
288 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
289 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291 followerActor.underlyingActor().clear();
293 for(int i=0;i<3;i++) {
294 sendReplicate(actorContext, lastIndex+i+1);
295 leader.handleMessage(followerActor, new AppendEntriesReply(
296 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
300 for(int i=3;i<5;i++) {
301 sendReplicate(actorContext, lastIndex + i + 1);
304 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
305 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
306 // get sent to the follower - but not the 5th
307 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
309 for(int i=0;i<4;i++) {
310 long expected = allMessages.get(i).getEntries().get(0).getIndex();
311 assertEquals(expected, i+2);
316 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
317 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
319 MockRaftActorContext actorContext = createActorContextWithFollower();
320 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
322 public FiniteDuration getHeartBeatInterval() {
323 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
328 actorContext.getTermInformation().update(term, "");
330 leader = new Leader(actorContext);
332 // Leader will send an immediate heartbeat - ignore it.
333 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
335 // The follower would normally reply - simulate that explicitly here.
336 long lastIndex = actorContext.getReplicatedLog().lastIndex();
337 leader.handleMessage(followerActor, new AppendEntriesReply(
338 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
339 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
341 followerActor.underlyingActor().clear();
343 sendReplicate(actorContext, lastIndex+1);
345 // Wait slightly longer than heartbeat duration
346 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
348 leader.handleMessage(leaderActor, new SendHeartBeat());
350 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
351 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
353 assertEquals(1, allMessages.get(0).getEntries().size());
354 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
355 assertEquals(1, allMessages.get(1).getEntries().size());
356 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
361 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
362 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
364 MockRaftActorContext actorContext = createActorContextWithFollower();
365 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
367 public FiniteDuration getHeartBeatInterval() {
368 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
373 actorContext.getTermInformation().update(term, "");
375 leader = new Leader(actorContext);
377 // Leader will send an immediate heartbeat - ignore it.
378 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
380 // The follower would normally reply - simulate that explicitly here.
381 long lastIndex = actorContext.getReplicatedLog().lastIndex();
382 leader.handleMessage(followerActor, new AppendEntriesReply(
383 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
384 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
386 followerActor.underlyingActor().clear();
388 for(int i=0;i<3;i++) {
389 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
390 leader.handleMessage(leaderActor, new SendHeartBeat());
393 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
394 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
398 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
399 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
401 MockRaftActorContext actorContext = createActorContextWithFollower();
402 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
404 public FiniteDuration getHeartBeatInterval() {
405 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
410 actorContext.getTermInformation().update(term, "");
412 leader = new Leader(actorContext);
414 // Leader will send an immediate heartbeat - ignore it.
415 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
417 // The follower would normally reply - simulate that explicitly here.
418 long lastIndex = actorContext.getReplicatedLog().lastIndex();
419 leader.handleMessage(followerActor, new AppendEntriesReply(
420 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
421 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
423 followerActor.underlyingActor().clear();
425 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
426 leader.handleMessage(leaderActor, new SendHeartBeat());
427 sendReplicate(actorContext, lastIndex+1);
429 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
430 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
432 assertEquals(0, allMessages.get(0).getEntries().size());
433 assertEquals(1, allMessages.get(1).getEntries().size());
438 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
439 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
441 MockRaftActorContext actorContext = createActorContext();
443 leader = new Leader(actorContext);
445 actorContext.setLastApplied(0);
447 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
448 long term = actorContext.getTermInformation().getCurrentTerm();
449 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
450 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
452 actorContext.getReplicatedLog().append(newEntry);
454 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
455 new Replicate(leaderActor, "state-id", newEntry));
457 // State should not change
458 assertTrue(raftBehavior instanceof Leader);
460 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
462 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
463 // one since lastApplied state is 0.
464 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
465 leaderActor, ApplyState.class);
466 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
468 for(int i = 0; i <= newLogIndex - 1; i++ ) {
469 ApplyState applyState = applyStateList.get(i);
470 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
471 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
474 ApplyState last = applyStateList.get((int) newLogIndex - 1);
475 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
476 assertEquals("getIdentifier", "state-id", last.getIdentifier());
480 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
481 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
483 MockRaftActorContext actorContext = createActorContextWithFollower();
485 Map<String, String> leadersSnapshot = new HashMap<>();
486 leadersSnapshot.put("1", "A");
487 leadersSnapshot.put("2", "B");
488 leadersSnapshot.put("3", "C");
491 actorContext.getReplicatedLog().removeFrom(0);
493 final int commitIndex = 3;
494 final int snapshotIndex = 2;
495 final int newEntryIndex = 4;
496 final int snapshotTerm = 1;
497 final int currentTerm = 2;
499 // set the snapshot variables in replicatedlog
500 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
501 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
502 actorContext.setCommitIndex(commitIndex);
503 //set follower timeout to 2 mins, helps during debugging
504 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
506 leader = new Leader(actorContext);
508 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
509 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
512 ReplicatedLogImplEntry entry =
513 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
514 new MockRaftActorContext.MockPayload("D"));
516 //update follower timestamp
517 leader.markFollowerActive(FOLLOWER_ID);
519 ByteString bs = toByteString(leadersSnapshot);
520 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
521 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
522 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
523 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
525 //send first chunk and no InstallSnapshotReply received yet
527 fts.incrementChunkIndex();
529 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
530 TimeUnit.MILLISECONDS);
532 leader.handleMessage(leaderActor, new SendHeartBeat());
534 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
536 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
538 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
540 //InstallSnapshotReply received
541 fts.markSendStatus(true);
543 leader.handleMessage(leaderActor, new SendHeartBeat());
545 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
547 assertEquals(commitIndex, is.getLastIncludedIndex());
551 public void testSendAppendEntriesSnapshotScenario() throws Exception {
552 logStart("testSendAppendEntriesSnapshotScenario");
554 MockRaftActorContext actorContext = createActorContextWithFollower();
556 Map<String, String> leadersSnapshot = new HashMap<>();
557 leadersSnapshot.put("1", "A");
558 leadersSnapshot.put("2", "B");
559 leadersSnapshot.put("3", "C");
562 actorContext.getReplicatedLog().removeFrom(0);
564 final int followersLastIndex = 2;
565 final int snapshotIndex = 3;
566 final int newEntryIndex = 4;
567 final int snapshotTerm = 1;
568 final int currentTerm = 2;
570 // set the snapshot variables in replicatedlog
571 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
572 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
573 actorContext.setCommitIndex(followersLastIndex);
575 leader = new Leader(actorContext);
577 // Leader will send an immediate heartbeat - ignore it.
578 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
581 ReplicatedLogImplEntry entry =
582 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
583 new MockRaftActorContext.MockPayload("D"));
585 actorContext.getReplicatedLog().append(entry);
587 //update follower timestamp
588 leader.markFollowerActive(FOLLOWER_ID);
590 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
591 RaftActorBehavior raftBehavior = leader.handleMessage(
592 leaderActor, new Replicate(null, "state-id", entry));
594 assertTrue(raftBehavior instanceof Leader);
596 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
600 public void testInitiateInstallSnapshot() throws Exception {
601 logStart("testInitiateInstallSnapshot");
603 MockRaftActorContext actorContext = createActorContextWithFollower();
606 actorContext.getReplicatedLog().removeFrom(0);
608 final int followersLastIndex = 2;
609 final int snapshotIndex = 3;
610 final int newEntryIndex = 4;
611 final int snapshotTerm = 1;
612 final int currentTerm = 2;
614 // set the snapshot variables in replicatedlog
615 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
616 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
617 actorContext.setLastApplied(3);
618 actorContext.setCommitIndex(followersLastIndex);
620 leader = new Leader(actorContext);
622 // Leader will send an immediate heartbeat - ignore it.
623 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
625 // set the snapshot as absent and check if capture-snapshot is invoked.
626 leader.setSnapshot(null);
629 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
630 new MockRaftActorContext.MockPayload("D"));
632 actorContext.getReplicatedLog().append(entry);
634 //update follower timestamp
635 leader.markFollowerActive(FOLLOWER_ID);
637 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
639 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
641 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
643 assertTrue(cs.isInstallSnapshotInitiated());
644 assertEquals(3, cs.getLastAppliedIndex());
645 assertEquals(1, cs.getLastAppliedTerm());
646 assertEquals(4, cs.getLastIndex());
647 assertEquals(2, cs.getLastTerm());
649 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
650 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
652 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
656 public void testInitiateForceInstallSnapshot() throws Exception {
657 logStart("testInitiateForceInstallSnapshot");
659 MockRaftActorContext actorContext = createActorContextWithFollower();
661 final int followersLastIndex = 2;
662 final int snapshotIndex = -1;
663 final int newEntryIndex = 4;
664 final int snapshotTerm = -1;
665 final int currentTerm = 2;
667 // set the snapshot variables in replicatedlog
668 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
669 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
670 actorContext.setLastApplied(3);
671 actorContext.setCommitIndex(followersLastIndex);
673 actorContext.getReplicatedLog().removeFrom(0);
675 leader = new Leader(actorContext);
677 // Leader will send an immediate heartbeat - ignore it.
678 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
680 // set the snapshot as absent and check if capture-snapshot is invoked.
681 leader.setSnapshot(null);
683 for(int i=0;i<4;i++) {
684 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
685 new MockRaftActorContext.MockPayload("X" + i)));
689 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
690 new MockRaftActorContext.MockPayload("D"));
692 actorContext.getReplicatedLog().append(entry);
694 //update follower timestamp
695 leader.markFollowerActive(FOLLOWER_ID);
697 // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
698 // installed with a SendInstallSnapshot
699 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
701 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
703 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
705 assertTrue(cs.isInstallSnapshotInitiated());
706 assertEquals(3, cs.getLastAppliedIndex());
707 assertEquals(1, cs.getLastAppliedTerm());
708 assertEquals(4, cs.getLastIndex());
709 assertEquals(2, cs.getLastTerm());
711 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
712 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
714 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
719 public void testInstallSnapshot() throws Exception {
720 logStart("testInstallSnapshot");
722 MockRaftActorContext actorContext = createActorContextWithFollower();
724 Map<String, String> leadersSnapshot = new HashMap<>();
725 leadersSnapshot.put("1", "A");
726 leadersSnapshot.put("2", "B");
727 leadersSnapshot.put("3", "C");
730 actorContext.getReplicatedLog().removeFrom(0);
732 final int lastAppliedIndex = 3;
733 final int snapshotIndex = 2;
734 final int snapshotTerm = 1;
735 final int currentTerm = 2;
737 // set the snapshot variables in replicatedlog
738 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
739 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
740 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
741 actorContext.setCommitIndex(lastAppliedIndex);
742 actorContext.setLastApplied(lastAppliedIndex);
744 leader = new Leader(actorContext);
746 // Initial heartbeat.
747 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
749 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
750 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
752 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
753 Collections.<ReplicatedLogEntry>emptyList(),
754 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
756 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
758 assertTrue(raftBehavior instanceof Leader);
760 // check if installsnapshot gets called with the correct values.
762 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
764 assertNotNull(installSnapshot.getData());
765 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
766 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
768 assertEquals(currentTerm, installSnapshot.getTerm());
772 public void testForceInstallSnapshot() throws Exception {
773 logStart("testForceInstallSnapshot");
775 MockRaftActorContext actorContext = createActorContextWithFollower();
777 Map<String, String> leadersSnapshot = new HashMap<>();
778 leadersSnapshot.put("1", "A");
779 leadersSnapshot.put("2", "B");
780 leadersSnapshot.put("3", "C");
782 final int lastAppliedIndex = 3;
783 final int snapshotIndex = -1;
784 final int snapshotTerm = -1;
785 final int currentTerm = 2;
787 // set the snapshot variables in replicatedlog
788 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
789 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
790 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
791 actorContext.setCommitIndex(lastAppliedIndex);
792 actorContext.setLastApplied(lastAppliedIndex);
794 leader = new Leader(actorContext);
796 // Initial heartbeat.
797 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
799 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
800 leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
802 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
803 Collections.<ReplicatedLogEntry>emptyList(),
804 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
806 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
808 assertTrue(raftBehavior instanceof Leader);
810 // check if installsnapshot gets called with the correct values.
812 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
814 assertNotNull(installSnapshot.getData());
815 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
816 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
818 assertEquals(currentTerm, installSnapshot.getTerm());
822 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
823 logStart("testHandleInstallSnapshotReplyLastChunk");
825 MockRaftActorContext actorContext = createActorContextWithFollower();
827 final int commitIndex = 3;
828 final int snapshotIndex = 2;
829 final int snapshotTerm = 1;
830 final int currentTerm = 2;
832 actorContext.setCommitIndex(commitIndex);
834 leader = new Leader(actorContext);
836 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
837 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
839 // Ignore initial heartbeat.
840 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
842 Map<String, String> leadersSnapshot = new HashMap<>();
843 leadersSnapshot.put("1", "A");
844 leadersSnapshot.put("2", "B");
845 leadersSnapshot.put("3", "C");
847 // set the snapshot variables in replicatedlog
849 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
850 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
851 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
853 ByteString bs = toByteString(leadersSnapshot);
854 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
855 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
856 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
857 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
858 while(!fts.isLastChunk(fts.getChunkIndex())) {
860 fts.incrementChunkIndex();
864 actorContext.getReplicatedLog().removeFrom(0);
866 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
867 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
869 assertTrue(raftBehavior instanceof Leader);
871 assertEquals(0, leader.followerSnapshotSize());
872 assertEquals(1, leader.followerLogSize());
873 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
875 assertEquals(commitIndex, fli.getMatchIndex());
876 assertEquals(commitIndex + 1, fli.getNextIndex());
880 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
881 logStart("testSendSnapshotfromInstallSnapshotReply");
883 MockRaftActorContext actorContext = createActorContextWithFollower();
885 final int commitIndex = 3;
886 final int snapshotIndex = 2;
887 final int snapshotTerm = 1;
888 final int currentTerm = 2;
890 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
892 public int getSnapshotChunkSize() {
896 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
897 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
899 actorContext.setConfigParams(configParams);
900 actorContext.setCommitIndex(commitIndex);
902 leader = new Leader(actorContext);
904 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
905 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
907 Map<String, String> leadersSnapshot = new HashMap<>();
908 leadersSnapshot.put("1", "A");
909 leadersSnapshot.put("2", "B");
910 leadersSnapshot.put("3", "C");
912 // set the snapshot variables in replicatedlog
913 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917 ByteString bs = toByteString(leadersSnapshot);
918 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
920 leader.setSnapshot(snapshot);
922 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
924 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
926 assertEquals(1, installSnapshot.getChunkIndex());
927 assertEquals(3, installSnapshot.getTotalChunks());
929 followerActor.underlyingActor().clear();
930 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
931 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
933 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
935 assertEquals(2, installSnapshot.getChunkIndex());
936 assertEquals(3, installSnapshot.getTotalChunks());
938 followerActor.underlyingActor().clear();
939 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
940 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
942 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
944 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
945 followerActor.underlyingActor().clear();
946 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
947 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
949 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
951 Assert.assertNull(installSnapshot);
956 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
957 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
959 MockRaftActorContext actorContext = createActorContextWithFollower();
961 final int commitIndex = 3;
962 final int snapshotIndex = 2;
963 final int snapshotTerm = 1;
964 final int currentTerm = 2;
966 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
968 public int getSnapshotChunkSize() {
973 actorContext.setCommitIndex(commitIndex);
975 leader = new Leader(actorContext);
977 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
980 Map<String, String> leadersSnapshot = new HashMap<>();
981 leadersSnapshot.put("1", "A");
982 leadersSnapshot.put("2", "B");
983 leadersSnapshot.put("3", "C");
985 // set the snapshot variables in replicatedlog
986 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
990 ByteString bs = toByteString(leadersSnapshot);
991 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
992 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
993 leader.setSnapshot(snapshot);
995 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
996 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
998 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1000 assertEquals(1, installSnapshot.getChunkIndex());
1001 assertEquals(3, installSnapshot.getTotalChunks());
1003 followerActor.underlyingActor().clear();
1005 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006 FOLLOWER_ID, -1, false));
1008 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1009 TimeUnit.MILLISECONDS);
1011 leader.handleMessage(leaderActor, new SendHeartBeat());
1013 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1015 assertEquals(1, installSnapshot.getChunkIndex());
1016 assertEquals(3, installSnapshot.getTotalChunks());
1020 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1021 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1023 MockRaftActorContext actorContext = createActorContextWithFollower();
1025 final int commitIndex = 3;
1026 final int snapshotIndex = 2;
1027 final int snapshotTerm = 1;
1028 final int currentTerm = 2;
1030 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1032 public int getSnapshotChunkSize() {
1037 actorContext.setCommitIndex(commitIndex);
1039 leader = new Leader(actorContext);
1041 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1042 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1044 Map<String, String> leadersSnapshot = new HashMap<>();
1045 leadersSnapshot.put("1", "A");
1046 leadersSnapshot.put("2", "B");
1047 leadersSnapshot.put("3", "C");
1049 // set the snapshot variables in replicatedlog
1050 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1051 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1052 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1054 ByteString bs = toByteString(leadersSnapshot);
1055 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1056 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1057 leader.setSnapshot(snapshot);
1059 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1061 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1063 assertEquals(1, installSnapshot.getChunkIndex());
1064 assertEquals(3, installSnapshot.getTotalChunks());
1065 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1067 int hashCode = installSnapshot.getData().hashCode();
1069 followerActor.underlyingActor().clear();
1071 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1072 FOLLOWER_ID, 1, true));
1074 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1076 assertEquals(2, installSnapshot.getChunkIndex());
1077 assertEquals(3, installSnapshot.getTotalChunks());
1078 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1082 public void testFollowerToSnapshotLogic() {
1083 logStart("testFollowerToSnapshotLogic");
1085 MockRaftActorContext actorContext = createActorContext();
1087 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1089 public int getSnapshotChunkSize() {
1094 leader = new Leader(actorContext);
1096 Map<String, String> leadersSnapshot = new HashMap<>();
1097 leadersSnapshot.put("1", "A");
1098 leadersSnapshot.put("2", "B");
1099 leadersSnapshot.put("3", "C");
1101 ByteString bs = toByteString(leadersSnapshot);
1102 byte[] barray = bs.toByteArray();
1104 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1105 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1107 assertEquals(bs.size(), barray.length);
1110 for (int i=0; i < barray.length; i = i + 50) {
1114 if (i + 50 > barray.length) {
1118 ByteString chunk = fts.getNextChunk();
1119 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1120 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1122 fts.markSendStatus(true);
1123 if (!fts.isLastChunk(chunkIndex)) {
1124 fts.incrementChunkIndex();
1128 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1131 @Override protected RaftActorBehavior createBehavior(
1132 RaftActorContext actorContext) {
1133 return new Leader(actorContext);
1137 protected MockRaftActorContext createActorContext() {
1138 return createActorContext(leaderActor);
1142 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1143 return createActorContext(LEADER_ID, actorRef);
1146 private MockRaftActorContext createActorContextWithFollower() {
1147 MockRaftActorContext actorContext = createActorContext();
1148 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1149 followerActor.path().toString()).build());
1150 return actorContext;
1153 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1154 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1155 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1156 configParams.setElectionTimeoutFactor(100000);
1157 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1158 context.setConfigParams(configParams);
1159 context.setPayloadVersion(payloadVersion);
1163 private MockRaftActorContext createFollowerActorContextWithLeader() {
1164 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1165 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1166 followerConfig.setElectionTimeoutFactor(10000);
1167 followerActorContext.setConfigParams(followerConfig);
1168 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1169 return followerActorContext;
1173 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1174 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1176 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1178 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1180 Follower follower = new Follower(followerActorContext);
1181 followerActor.underlyingActor().setBehavior(follower);
1183 Map<String, String> peerAddresses = new HashMap<>();
1184 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1186 leaderActorContext.setPeerAddresses(peerAddresses);
1188 leaderActorContext.getReplicatedLog().removeFrom(0);
1191 leaderActorContext.setReplicatedLog(
1192 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1194 leaderActorContext.setCommitIndex(1);
1196 followerActorContext.getReplicatedLog().removeFrom(0);
1198 // follower too has the exact same log entries and has the same commit index
1199 followerActorContext.setReplicatedLog(
1200 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1202 followerActorContext.setCommitIndex(1);
1204 leader = new Leader(leaderActorContext);
1206 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1208 assertEquals(1, appendEntries.getLeaderCommit());
1209 assertEquals(0, appendEntries.getEntries().size());
1210 assertEquals(0, appendEntries.getPrevLogIndex());
1212 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1213 leaderActor, AppendEntriesReply.class);
1215 assertEquals(2, appendEntriesReply.getLogLastIndex());
1216 assertEquals(1, appendEntriesReply.getLogLastTerm());
1218 // follower returns its next index
1219 assertEquals(2, appendEntriesReply.getLogLastIndex());
1220 assertEquals(1, appendEntriesReply.getLogLastTerm());
1226 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1227 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1229 MockRaftActorContext leaderActorContext = createActorContext();
1231 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1232 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1234 Follower follower = new Follower(followerActorContext);
1235 followerActor.underlyingActor().setBehavior(follower);
1237 Map<String, String> leaderPeerAddresses = new HashMap<>();
1238 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1240 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1242 leaderActorContext.getReplicatedLog().removeFrom(0);
1244 leaderActorContext.setReplicatedLog(
1245 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1247 leaderActorContext.setCommitIndex(1);
1249 followerActorContext.getReplicatedLog().removeFrom(0);
1251 followerActorContext.setReplicatedLog(
1252 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1254 // follower has the same log entries but its commit index > leaders commit index
1255 followerActorContext.setCommitIndex(2);
1257 leader = new Leader(leaderActorContext);
1259 // Initial heartbeat
1260 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1262 assertEquals(1, appendEntries.getLeaderCommit());
1263 assertEquals(0, appendEntries.getEntries().size());
1264 assertEquals(0, appendEntries.getPrevLogIndex());
1266 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1267 leaderActor, AppendEntriesReply.class);
1269 assertEquals(2, appendEntriesReply.getLogLastIndex());
1270 assertEquals(1, appendEntriesReply.getLogLastTerm());
1272 leaderActor.underlyingActor().setBehavior(follower);
1273 leader.handleMessage(followerActor, appendEntriesReply);
1275 leaderActor.underlyingActor().clear();
1276 followerActor.underlyingActor().clear();
1278 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1279 TimeUnit.MILLISECONDS);
1281 leader.handleMessage(leaderActor, new SendHeartBeat());
1283 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1285 assertEquals(2, appendEntries.getLeaderCommit());
1286 assertEquals(0, appendEntries.getEntries().size());
1287 assertEquals(2, appendEntries.getPrevLogIndex());
1289 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1291 assertEquals(2, appendEntriesReply.getLogLastIndex());
1292 assertEquals(1, appendEntriesReply.getLogLastTerm());
1294 assertEquals(2, followerActorContext.getCommitIndex());
1300 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1301 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1303 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1304 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1305 new FiniteDuration(1000, TimeUnit.SECONDS));
1307 leaderActorContext.setReplicatedLog(
1308 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309 long leaderCommitIndex = 2;
1310 leaderActorContext.setCommitIndex(leaderCommitIndex);
1311 leaderActorContext.setLastApplied(leaderCommitIndex);
1313 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1314 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1316 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1318 followerActorContext.setReplicatedLog(
1319 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1320 followerActorContext.setCommitIndex(0);
1321 followerActorContext.setLastApplied(0);
1323 Follower follower = new Follower(followerActorContext);
1324 followerActor.underlyingActor().setBehavior(follower);
1326 leader = new Leader(leaderActorContext);
1328 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1329 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1331 MessageCollectorActor.clearMessages(followerActor);
1332 MessageCollectorActor.clearMessages(leaderActor);
1334 // Verify initial AppendEntries sent with the leader's current commit index.
1335 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1336 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1337 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1339 leaderActor.underlyingActor().setBehavior(leader);
1341 leader.handleMessage(followerActor, appendEntriesReply);
1343 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1344 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1346 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1347 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1348 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1350 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1351 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1352 appendEntries.getEntries().get(0).getData());
1353 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1354 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1355 appendEntries.getEntries().get(1).getData());
1357 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1358 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1360 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1362 ApplyState applyState = applyStateList.get(0);
1363 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1364 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1365 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1366 applyState.getReplicatedLogEntry().getData());
1368 applyState = applyStateList.get(1);
1369 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1370 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1371 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1372 applyState.getReplicatedLogEntry().getData());
1374 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1375 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1379 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1380 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1382 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1383 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1384 new FiniteDuration(1000, TimeUnit.SECONDS));
1386 leaderActorContext.setReplicatedLog(
1387 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1388 long leaderCommitIndex = 1;
1389 leaderActorContext.setCommitIndex(leaderCommitIndex);
1390 leaderActorContext.setLastApplied(leaderCommitIndex);
1392 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1393 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1395 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1397 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1398 followerActorContext.setCommitIndex(-1);
1399 followerActorContext.setLastApplied(-1);
1401 Follower follower = new Follower(followerActorContext);
1402 followerActor.underlyingActor().setBehavior(follower);
1404 leader = new Leader(leaderActorContext);
1406 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1407 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1409 MessageCollectorActor.clearMessages(followerActor);
1410 MessageCollectorActor.clearMessages(leaderActor);
1412 // Verify initial AppendEntries sent with the leader's current commit index.
1413 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1414 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1415 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1417 leaderActor.underlyingActor().setBehavior(leader);
1419 leader.handleMessage(followerActor, appendEntriesReply);
1421 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1422 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1424 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1425 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1426 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1428 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1429 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1430 appendEntries.getEntries().get(0).getData());
1431 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1432 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1433 appendEntries.getEntries().get(1).getData());
1435 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1436 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1438 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1440 ApplyState applyState = applyStateList.get(0);
1441 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1442 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1444 applyState.getReplicatedLogEntry().getData());
1446 applyState = applyStateList.get(1);
1447 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1448 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1449 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1450 applyState.getReplicatedLogEntry().getData());
1452 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1453 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1457 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1458 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1460 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1461 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1462 new FiniteDuration(1000, TimeUnit.SECONDS));
1464 leaderActorContext.setReplicatedLog(
1465 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1466 long leaderCommitIndex = 1;
1467 leaderActorContext.setCommitIndex(leaderCommitIndex);
1468 leaderActorContext.setLastApplied(leaderCommitIndex);
1470 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1471 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1473 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1475 followerActorContext.setReplicatedLog(
1476 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1477 followerActorContext.setCommitIndex(-1);
1478 followerActorContext.setLastApplied(-1);
1480 Follower follower = new Follower(followerActorContext);
1481 followerActor.underlyingActor().setBehavior(follower);
1483 leader = new Leader(leaderActorContext);
1485 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1486 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1488 MessageCollectorActor.clearMessages(followerActor);
1489 MessageCollectorActor.clearMessages(leaderActor);
1491 // Verify initial AppendEntries sent with the leader's current commit index.
1492 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1494 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1496 leaderActor.underlyingActor().setBehavior(leader);
1498 leader.handleMessage(followerActor, appendEntriesReply);
1500 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1501 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1503 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1504 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1505 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1507 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1508 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1509 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1510 appendEntries.getEntries().get(0).getData());
1511 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1512 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1513 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1514 appendEntries.getEntries().get(1).getData());
1516 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1517 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1519 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1521 ApplyState applyState = applyStateList.get(0);
1522 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1523 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1524 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1525 applyState.getReplicatedLogEntry().getData());
1527 applyState = applyStateList.get(1);
1528 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1529 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1530 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1531 applyState.getReplicatedLogEntry().getData());
1533 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1534 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1535 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1539 public void testHandleAppendEntriesReplyWithNewerTerm(){
1540 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1542 MockRaftActorContext leaderActorContext = createActorContext();
1543 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1544 new FiniteDuration(10000, TimeUnit.SECONDS));
1546 leaderActorContext.setReplicatedLog(
1547 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1549 leader = new Leader(leaderActorContext);
1550 leaderActor.underlyingActor().setBehavior(leader);
1551 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1553 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1555 assertEquals(false, appendEntriesReply.isSuccess());
1556 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1558 MessageCollectorActor.clearMessages(leaderActor);
1562 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1563 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1565 MockRaftActorContext leaderActorContext = createActorContext();
1566 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1567 new FiniteDuration(10000, TimeUnit.SECONDS));
1569 leaderActorContext.setReplicatedLog(
1570 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1571 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1573 leader = new Leader(leaderActorContext);
1574 leaderActor.underlyingActor().setBehavior(leader);
1575 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1577 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1579 assertEquals(false, appendEntriesReply.isSuccess());
1580 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1582 MessageCollectorActor.clearMessages(leaderActor);
1586 public void testHandleAppendEntriesReplySuccess() throws Exception {
1587 logStart("testHandleAppendEntriesReplySuccess");
1589 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1591 leaderActorContext.setReplicatedLog(
1592 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1594 leaderActorContext.setCommitIndex(1);
1595 leaderActorContext.setLastApplied(1);
1596 leaderActorContext.getTermInformation().update(1, "leader");
1598 leader = new Leader(leaderActorContext);
1600 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1602 short payloadVersion = 5;
1603 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1605 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1607 assertEquals(RaftState.Leader, raftActorBehavior.state());
1609 assertEquals(2, leaderActorContext.getCommitIndex());
1611 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1612 leaderActor, ApplyJournalEntries.class);
1614 assertEquals(2, leaderActorContext.getLastApplied());
1616 assertEquals(2, applyJournalEntries.getToIndex());
1618 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1621 assertEquals(1,applyStateList.size());
1623 ApplyState applyState = applyStateList.get(0);
1625 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1627 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1628 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1632 public void testHandleAppendEntriesReplyUnknownFollower(){
1633 logStart("testHandleAppendEntriesReplyUnknownFollower");
1635 MockRaftActorContext leaderActorContext = createActorContext();
1637 leader = new Leader(leaderActorContext);
1639 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1641 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1643 assertEquals(RaftState.Leader, raftActorBehavior.state());
1647 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1648 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1650 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1651 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1652 new FiniteDuration(1000, TimeUnit.SECONDS));
1653 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1655 leaderActorContext.setReplicatedLog(
1656 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1657 long leaderCommitIndex = 3;
1658 leaderActorContext.setCommitIndex(leaderCommitIndex);
1659 leaderActorContext.setLastApplied(leaderCommitIndex);
1661 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1662 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1663 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1664 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1666 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1668 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1669 followerActorContext.setCommitIndex(-1);
1670 followerActorContext.setLastApplied(-1);
1672 Follower follower = new Follower(followerActorContext);
1673 followerActor.underlyingActor().setBehavior(follower);
1675 leader = new Leader(leaderActorContext);
1677 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1678 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1680 MessageCollectorActor.clearMessages(followerActor);
1681 MessageCollectorActor.clearMessages(leaderActor);
1683 // Verify initial AppendEntries sent with the leader's current commit index.
1684 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1685 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1686 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1688 leaderActor.underlyingActor().setBehavior(leader);
1690 leader.handleMessage(followerActor, appendEntriesReply);
1692 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1693 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1695 appendEntries = appendEntriesList.get(0);
1696 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1697 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1698 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1700 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1701 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1702 appendEntries.getEntries().get(0).getData());
1703 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1704 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1705 appendEntries.getEntries().get(1).getData());
1707 appendEntries = appendEntriesList.get(1);
1708 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1709 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1710 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1712 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1713 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1714 appendEntries.getEntries().get(0).getData());
1715 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1716 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1717 appendEntries.getEntries().get(1).getData());
1719 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1720 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1722 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1724 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1725 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1729 public void testHandleRequestVoteReply(){
1730 logStart("testHandleRequestVoteReply");
1732 MockRaftActorContext leaderActorContext = createActorContext();
1734 leader = new Leader(leaderActorContext);
1736 // Should be a no-op.
1737 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1738 new RequestVoteReply(1, true));
1740 assertEquals(RaftState.Leader, raftActorBehavior.state());
1742 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1744 assertEquals(RaftState.Leader, raftActorBehavior.state());
1748 public void testIsolatedLeaderCheckNoFollowers() {
1749 logStart("testIsolatedLeaderCheckNoFollowers");
1751 MockRaftActorContext leaderActorContext = createActorContext();
1753 leader = new Leader(leaderActorContext);
1754 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1755 Assert.assertTrue(behavior instanceof Leader);
1758 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1759 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1760 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1762 MockRaftActorContext leaderActorContext = createActorContext();
1764 Map<String, String> peerAddresses = new HashMap<>();
1765 peerAddresses.put("follower-1", followerActor1.path().toString());
1766 peerAddresses.put("follower-2", followerActor2.path().toString());
1768 leaderActorContext.setPeerAddresses(peerAddresses);
1769 leaderActorContext.setRaftPolicy(raftPolicy);
1771 leader = new Leader(leaderActorContext);
1773 leader.markFollowerActive("follower-1");
1774 leader.markFollowerActive("follower-2");
1775 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1776 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1777 behavior instanceof Leader);
1779 // kill 1 follower and verify if that got killed
1780 final JavaTestKit probe = new JavaTestKit(getSystem());
1781 probe.watch(followerActor1);
1782 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1783 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1784 assertEquals(termMsg1.getActor(), followerActor1);
1786 leader.markFollowerInActive("follower-1");
1787 leader.markFollowerActive("follower-2");
1788 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1789 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1790 behavior instanceof Leader);
1792 // kill 2nd follower and leader should change to Isolated leader
1793 followerActor2.tell(PoisonPill.getInstance(), null);
1794 probe.watch(followerActor2);
1795 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1796 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1797 assertEquals(termMsg2.getActor(), followerActor2);
1799 leader.markFollowerInActive("follower-2");
1800 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1804 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1805 logStart("testIsolatedLeaderCheckTwoFollowers");
1807 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1809 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1810 behavior instanceof IsolatedLeader);
1814 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1815 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1817 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1819 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1820 behavior instanceof Leader);
1824 public void testLaggingFollowerStarvation() throws Exception {
1825 logStart("testLaggingFollowerStarvation");
1826 new JavaTestKit(getSystem()) {{
1827 String leaderActorId = actorFactory.generateActorId("leader");
1828 String follower1ActorId = actorFactory.generateActorId("follower");
1829 String follower2ActorId = actorFactory.generateActorId("follower");
1831 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1832 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1833 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1834 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1836 MockRaftActorContext leaderActorContext =
1837 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1839 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1840 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1841 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1843 leaderActorContext.setConfigParams(configParams);
1845 leaderActorContext.setReplicatedLog(
1846 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1848 Map<String, String> peerAddresses = new HashMap<>();
1849 peerAddresses.put(follower1ActorId,
1850 follower1Actor.path().toString());
1851 peerAddresses.put(follower2ActorId,
1852 follower2Actor.path().toString());
1854 leaderActorContext.setPeerAddresses(peerAddresses);
1855 leaderActorContext.getTermInformation().update(1, leaderActorId);
1857 RaftActorBehavior leader = createBehavior(leaderActorContext);
1859 leaderActor.underlyingActor().setBehavior(leader);
1861 for(int i=1;i<6;i++) {
1862 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1863 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1864 assertTrue(newBehavior == leader);
1865 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1868 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1869 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1871 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1872 heartbeats.size() > 1);
1874 // Check if follower-2 got AppendEntries during this time and was not starved
1875 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1877 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1878 appendEntries.size() > 1);
1884 public void testReplicationConsensusWithNonVotingFollower() {
1885 logStart("testReplicationConsensusWithNonVotingFollower");
1887 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1888 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1889 new FiniteDuration(1000, TimeUnit.SECONDS));
1891 leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1893 String nonVotingFollowerId = "nonvoting-follower";
1894 TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1895 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1897 leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1899 leader = new Leader(leaderActorContext);
1901 // Ignore initial heartbeats
1902 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1903 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1905 MessageCollectorActor.clearMessages(followerActor);
1906 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1907 MessageCollectorActor.clearMessages(leaderActor);
1909 // Send a Replicate message and wait for AppendEntries.
1910 sendReplicate(leaderActorContext, 0);
1912 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1913 MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1915 // Send reply only from the voting follower and verify consensus via ApplyState.
1916 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1918 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1920 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1922 MessageCollectorActor.clearMessages(followerActor);
1923 MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1924 MessageCollectorActor.clearMessages(leaderActor);
1926 // Send another Replicate message
1927 sendReplicate(leaderActorContext, 1);
1929 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1930 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1931 AppendEntries.class);
1932 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1933 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1935 // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1936 leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1938 MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1940 // Send reply from the voting follower and verify consensus.
1941 leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1943 MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1947 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1948 ActorRef actorRef, RaftRPC rpc) throws Exception {
1949 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1950 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1953 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1955 private final long electionTimeOutIntervalMillis;
1956 private final int snapshotChunkSize;
1958 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1960 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1961 this.snapshotChunkSize = snapshotChunkSize;
1965 public FiniteDuration getElectionTimeOutInterval() {
1966 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1970 public int getSnapshotChunkSize() {
1971 return snapshotChunkSize;