2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
55 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
56 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import scala.concurrent.duration.FiniteDuration;
60 public class LeaderTest extends AbstractLeaderTest {
62 static final String FOLLOWER_ID = "follower";
63 public static final String LEADER_ID = "leader";
65 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
66 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
68 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
71 private Leader leader;
72 private final short payloadVersion = 5;
76 public void tearDown() throws Exception {
85 public void testHandleMessageForUnknownMessage() throws Exception {
86 logStart("testHandleMessageForUnknownMessage");
88 leader = new Leader(createActorContext());
90 // handle message should return the Leader state when it receives an
92 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
93 Assert.assertTrue(behavior instanceof Leader);
97 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
98 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
100 MockRaftActorContext actorContext = createActorContextWithFollower();
101 short payloadVersion = (short)5;
102 actorContext.setPayloadVersion(payloadVersion);
105 actorContext.getTermInformation().update(term, "");
107 leader = new Leader(actorContext);
109 // Leader should send an immediate heartbeat with no entries as follower is inactive.
110 long lastIndex = actorContext.getReplicatedLog().lastIndex();
111 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
112 assertEquals("getTerm", term, appendEntries.getTerm());
113 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
114 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
115 assertEquals("Entries size", 0, appendEntries.getEntries().size());
116 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
118 // The follower would normally reply - simulate that explicitly here.
119 leader.handleMessage(followerActor, new AppendEntriesReply(
120 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
121 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
123 followerActor.underlyingActor().clear();
125 // Sleep for the heartbeat interval so AppendEntries is sent.
126 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
127 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
129 leader.handleMessage(leaderActor, new SendHeartBeat());
131 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
132 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
133 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
134 assertEquals("Entries size", 1, appendEntries.getEntries().size());
135 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
136 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
137 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
141 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
142 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
143 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
145 actorContext.getReplicatedLog().append(newEntry);
146 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
150 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
151 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
153 MockRaftActorContext actorContext = createActorContextWithFollower();
156 actorContext.getTermInformation().update(term, "");
158 leader = new Leader(actorContext);
160 // Leader will send an immediate heartbeat - ignore it.
161 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163 // The follower would normally reply - simulate that explicitly here.
164 long lastIndex = actorContext.getReplicatedLog().lastIndex();
165 leader.handleMessage(followerActor, new AppendEntriesReply(
166 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
167 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
169 followerActor.underlyingActor().clear();
171 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
173 // State should not change
174 assertTrue(raftBehavior instanceof Leader);
176 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
178 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
179 assertEquals("Entries size", 1, appendEntries.getEntries().size());
180 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
181 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
182 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
183 assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
187 public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
188 logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
190 MockRaftActorContext actorContext = createActorContextWithFollower();
191 actorContext.setRaftPolicy(createRaftPolicy(true, true));
194 actorContext.getTermInformation().update(term, "");
196 leader = new Leader(actorContext);
198 // Leader will send an immediate heartbeat - ignore it.
199 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
201 // The follower would normally reply - simulate that explicitly here.
202 long lastIndex = actorContext.getReplicatedLog().lastIndex();
203 leader.handleMessage(followerActor, new AppendEntriesReply(
204 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
205 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
207 followerActor.underlyingActor().clear();
209 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
211 // State should not change
212 assertTrue(raftBehavior instanceof Leader);
214 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
216 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
217 assertEquals("Entries size", 1, appendEntries.getEntries().size());
218 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
219 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
220 assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
221 assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
225 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
226 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
228 MockRaftActorContext actorContext = createActorContextWithFollower();
229 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
231 public FiniteDuration getHeartBeatInterval() {
232 return FiniteDuration.apply(5, TimeUnit.SECONDS);
237 actorContext.getTermInformation().update(term, "");
239 leader = new Leader(actorContext);
241 // Leader will send an immediate heartbeat - ignore it.
242 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
244 // The follower would normally reply - simulate that explicitly here.
245 long lastIndex = actorContext.getReplicatedLog().lastIndex();
246 leader.handleMessage(followerActor, new AppendEntriesReply(
247 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
248 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
250 followerActor.underlyingActor().clear();
252 for(int i=0;i<5;i++) {
253 sendReplicate(actorContext, lastIndex+i+1);
256 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257 // We expect only 1 message to be sent because of two reasons,
258 // - an append entries reply was not received
259 // - the heartbeat interval has not expired
260 // In this scenario if multiple messages are sent they would likely be duplicates
261 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
265 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
266 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
268 MockRaftActorContext actorContext = createActorContextWithFollower();
269 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
271 public FiniteDuration getHeartBeatInterval() {
272 return FiniteDuration.apply(5, TimeUnit.SECONDS);
277 actorContext.getTermInformation().update(term, "");
279 leader = new Leader(actorContext);
281 // Leader will send an immediate heartbeat - ignore it.
282 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284 // The follower would normally reply - simulate that explicitly here.
285 long lastIndex = actorContext.getReplicatedLog().lastIndex();
286 leader.handleMessage(followerActor, new AppendEntriesReply(
287 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290 followerActor.underlyingActor().clear();
292 for(int i=0;i<3;i++) {
293 sendReplicate(actorContext, lastIndex+i+1);
294 leader.handleMessage(followerActor, new AppendEntriesReply(
295 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
299 for(int i=3;i<5;i++) {
300 sendReplicate(actorContext, lastIndex + i + 1);
303 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
304 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
305 // get sent to the follower - but not the 5th
306 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
308 for(int i=0;i<4;i++) {
309 long expected = allMessages.get(i).getEntries().get(0).getIndex();
310 assertEquals(expected, i+2);
315 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
316 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
318 MockRaftActorContext actorContext = createActorContextWithFollower();
319 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
321 public FiniteDuration getHeartBeatInterval() {
322 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
327 actorContext.getTermInformation().update(term, "");
329 leader = new Leader(actorContext);
331 // Leader will send an immediate heartbeat - ignore it.
332 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
334 // The follower would normally reply - simulate that explicitly here.
335 long lastIndex = actorContext.getReplicatedLog().lastIndex();
336 leader.handleMessage(followerActor, new AppendEntriesReply(
337 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
338 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
340 followerActor.underlyingActor().clear();
342 sendReplicate(actorContext, lastIndex+1);
344 // Wait slightly longer than heartbeat duration
345 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
347 leader.handleMessage(leaderActor, new SendHeartBeat());
349 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
350 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
352 assertEquals(1, allMessages.get(0).getEntries().size());
353 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
354 assertEquals(1, allMessages.get(1).getEntries().size());
355 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
360 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
361 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
363 MockRaftActorContext actorContext = createActorContextWithFollower();
364 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
366 public FiniteDuration getHeartBeatInterval() {
367 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
372 actorContext.getTermInformation().update(term, "");
374 leader = new Leader(actorContext);
376 // Leader will send an immediate heartbeat - ignore it.
377 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
379 // The follower would normally reply - simulate that explicitly here.
380 long lastIndex = actorContext.getReplicatedLog().lastIndex();
381 leader.handleMessage(followerActor, new AppendEntriesReply(
382 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
383 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
385 followerActor.underlyingActor().clear();
387 for(int i=0;i<3;i++) {
388 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
389 leader.handleMessage(leaderActor, new SendHeartBeat());
392 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
397 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
398 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
400 MockRaftActorContext actorContext = createActorContextWithFollower();
401 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
403 public FiniteDuration getHeartBeatInterval() {
404 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
409 actorContext.getTermInformation().update(term, "");
411 leader = new Leader(actorContext);
413 // Leader will send an immediate heartbeat - ignore it.
414 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
416 // The follower would normally reply - simulate that explicitly here.
417 long lastIndex = actorContext.getReplicatedLog().lastIndex();
418 leader.handleMessage(followerActor, new AppendEntriesReply(
419 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
422 followerActor.underlyingActor().clear();
424 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
425 leader.handleMessage(leaderActor, new SendHeartBeat());
426 sendReplicate(actorContext, lastIndex+1);
428 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
429 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
431 assertEquals(0, allMessages.get(0).getEntries().size());
432 assertEquals(1, allMessages.get(1).getEntries().size());
437 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
438 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
440 MockRaftActorContext actorContext = createActorContext();
442 leader = new Leader(actorContext);
444 actorContext.setLastApplied(0);
446 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
447 long term = actorContext.getTermInformation().getCurrentTerm();
448 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
449 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
451 actorContext.getReplicatedLog().append(newEntry);
453 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
454 new Replicate(leaderActor, "state-id", newEntry));
456 // State should not change
457 assertTrue(raftBehavior instanceof Leader);
459 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
461 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
462 // one since lastApplied state is 0.
463 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
464 leaderActor, ApplyState.class);
465 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
467 for(int i = 0; i <= newLogIndex - 1; i++ ) {
468 ApplyState applyState = applyStateList.get(i);
469 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
470 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
473 ApplyState last = applyStateList.get((int) newLogIndex - 1);
474 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
475 assertEquals("getIdentifier", "state-id", last.getIdentifier());
479 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
480 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
482 MockRaftActorContext actorContext = createActorContextWithFollower();
484 Map<String, String> leadersSnapshot = new HashMap<>();
485 leadersSnapshot.put("1", "A");
486 leadersSnapshot.put("2", "B");
487 leadersSnapshot.put("3", "C");
490 actorContext.getReplicatedLog().removeFrom(0);
492 final int commitIndex = 3;
493 final int snapshotIndex = 2;
494 final int newEntryIndex = 4;
495 final int snapshotTerm = 1;
496 final int currentTerm = 2;
498 // set the snapshot variables in replicatedlog
499 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
500 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
501 actorContext.setCommitIndex(commitIndex);
502 //set follower timeout to 2 mins, helps during debugging
503 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
505 leader = new Leader(actorContext);
507 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
508 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
511 ReplicatedLogImplEntry entry =
512 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
513 new MockRaftActorContext.MockPayload("D"));
515 //update follower timestamp
516 leader.markFollowerActive(FOLLOWER_ID);
518 ByteString bs = toByteString(leadersSnapshot);
519 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
520 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
521 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
522 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
524 //send first chunk and no InstallSnapshotReply received yet
526 fts.incrementChunkIndex();
528 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
529 TimeUnit.MILLISECONDS);
531 leader.handleMessage(leaderActor, new SendHeartBeat());
533 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
535 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
537 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
539 //InstallSnapshotReply received
540 fts.markSendStatus(true);
542 leader.handleMessage(leaderActor, new SendHeartBeat());
544 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
546 assertEquals(commitIndex, is.getLastIncludedIndex());
550 public void testSendAppendEntriesSnapshotScenario() throws Exception {
551 logStart("testSendAppendEntriesSnapshotScenario");
553 MockRaftActorContext actorContext = createActorContextWithFollower();
555 Map<String, String> leadersSnapshot = new HashMap<>();
556 leadersSnapshot.put("1", "A");
557 leadersSnapshot.put("2", "B");
558 leadersSnapshot.put("3", "C");
561 actorContext.getReplicatedLog().removeFrom(0);
563 final int followersLastIndex = 2;
564 final int snapshotIndex = 3;
565 final int newEntryIndex = 4;
566 final int snapshotTerm = 1;
567 final int currentTerm = 2;
569 // set the snapshot variables in replicatedlog
570 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572 actorContext.setCommitIndex(followersLastIndex);
574 leader = new Leader(actorContext);
576 // Leader will send an immediate heartbeat - ignore it.
577 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
580 ReplicatedLogImplEntry entry =
581 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582 new MockRaftActorContext.MockPayload("D"));
584 actorContext.getReplicatedLog().append(entry);
586 //update follower timestamp
587 leader.markFollowerActive(FOLLOWER_ID);
589 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
590 RaftActorBehavior raftBehavior = leader.handleMessage(
591 leaderActor, new Replicate(null, "state-id", entry));
593 assertTrue(raftBehavior instanceof Leader);
595 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
599 public void testInitiateInstallSnapshot() throws Exception {
600 logStart("testInitiateInstallSnapshot");
602 MockRaftActorContext actorContext = createActorContextWithFollower();
604 Map<String, String> leadersSnapshot = new HashMap<>();
605 leadersSnapshot.put("1", "A");
606 leadersSnapshot.put("2", "B");
607 leadersSnapshot.put("3", "C");
610 actorContext.getReplicatedLog().removeFrom(0);
612 final int followersLastIndex = 2;
613 final int snapshotIndex = 3;
614 final int newEntryIndex = 4;
615 final int snapshotTerm = 1;
616 final int currentTerm = 2;
618 // set the snapshot variables in replicatedlog
619 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
620 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
621 actorContext.setLastApplied(3);
622 actorContext.setCommitIndex(followersLastIndex);
624 leader = new Leader(actorContext);
626 // Leader will send an immediate heartbeat - ignore it.
627 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
629 // set the snapshot as absent and check if capture-snapshot is invoked.
630 leader.setSnapshot(null);
633 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
634 new MockRaftActorContext.MockPayload("D"));
636 actorContext.getReplicatedLog().append(entry);
638 //update follower timestamp
639 leader.markFollowerActive(FOLLOWER_ID);
641 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
643 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
645 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
647 assertTrue(cs.isInstallSnapshotInitiated());
648 assertEquals(3, cs.getLastAppliedIndex());
649 assertEquals(1, cs.getLastAppliedTerm());
650 assertEquals(4, cs.getLastIndex());
651 assertEquals(2, cs.getLastTerm());
653 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
654 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
656 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
660 public void testInstallSnapshot() throws Exception {
661 logStart("testInstallSnapshot");
663 MockRaftActorContext actorContext = createActorContextWithFollower();
665 Map<String, String> leadersSnapshot = new HashMap<>();
666 leadersSnapshot.put("1", "A");
667 leadersSnapshot.put("2", "B");
668 leadersSnapshot.put("3", "C");
671 actorContext.getReplicatedLog().removeFrom(0);
673 final int lastAppliedIndex = 3;
674 final int snapshotIndex = 2;
675 final int snapshotTerm = 1;
676 final int currentTerm = 2;
678 // set the snapshot variables in replicatedlog
679 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
682 actorContext.setCommitIndex(lastAppliedIndex);
683 actorContext.setLastApplied(lastAppliedIndex);
685 leader = new Leader(actorContext);
687 // Initial heartbeat.
688 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
690 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
691 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
693 Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
694 Collections.<ReplicatedLogEntry>emptyList(),
695 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
697 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
699 assertTrue(raftBehavior instanceof Leader);
701 // check if installsnapshot gets called with the correct values.
703 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
705 assertNotNull(installSnapshot.getData());
706 assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
707 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
709 assertEquals(currentTerm, installSnapshot.getTerm());
713 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
714 logStart("testHandleInstallSnapshotReplyLastChunk");
716 MockRaftActorContext actorContext = createActorContextWithFollower();
718 final int commitIndex = 3;
719 final int snapshotIndex = 2;
720 final int snapshotTerm = 1;
721 final int currentTerm = 2;
723 actorContext.setCommitIndex(commitIndex);
725 leader = new Leader(actorContext);
727 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
728 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
730 // Ignore initial heartbeat.
731 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
733 Map<String, String> leadersSnapshot = new HashMap<>();
734 leadersSnapshot.put("1", "A");
735 leadersSnapshot.put("2", "B");
736 leadersSnapshot.put("3", "C");
738 // set the snapshot variables in replicatedlog
740 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
741 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
742 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
744 ByteString bs = toByteString(leadersSnapshot);
745 leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
746 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
747 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
748 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
749 while(!fts.isLastChunk(fts.getChunkIndex())) {
751 fts.incrementChunkIndex();
755 actorContext.getReplicatedLog().removeFrom(0);
757 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
758 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
760 assertTrue(raftBehavior instanceof Leader);
762 assertEquals(0, leader.followerSnapshotSize());
763 assertEquals(1, leader.followerLogSize());
764 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
766 assertEquals(commitIndex, fli.getMatchIndex());
767 assertEquals(commitIndex + 1, fli.getNextIndex());
771 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
772 logStart("testSendSnapshotfromInstallSnapshotReply");
774 MockRaftActorContext actorContext = createActorContextWithFollower();
776 final int commitIndex = 3;
777 final int snapshotIndex = 2;
778 final int snapshotTerm = 1;
779 final int currentTerm = 2;
781 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
783 public int getSnapshotChunkSize() {
787 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
788 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
790 actorContext.setConfigParams(configParams);
791 actorContext.setCommitIndex(commitIndex);
793 leader = new Leader(actorContext);
795 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
796 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
798 Map<String, String> leadersSnapshot = new HashMap<>();
799 leadersSnapshot.put("1", "A");
800 leadersSnapshot.put("2", "B");
801 leadersSnapshot.put("3", "C");
803 // set the snapshot variables in replicatedlog
804 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
808 ByteString bs = toByteString(leadersSnapshot);
809 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
810 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
811 leader.setSnapshot(snapshot);
813 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
815 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
817 assertEquals(1, installSnapshot.getChunkIndex());
818 assertEquals(3, installSnapshot.getTotalChunks());
820 followerActor.underlyingActor().clear();
821 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
822 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
824 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826 assertEquals(2, installSnapshot.getChunkIndex());
827 assertEquals(3, installSnapshot.getTotalChunks());
829 followerActor.underlyingActor().clear();
830 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
831 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
833 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
835 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
836 followerActor.underlyingActor().clear();
837 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
838 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
840 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
842 Assert.assertNull(installSnapshot);
847 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
848 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
850 MockRaftActorContext actorContext = createActorContextWithFollower();
852 final int commitIndex = 3;
853 final int snapshotIndex = 2;
854 final int snapshotTerm = 1;
855 final int currentTerm = 2;
857 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
859 public int getSnapshotChunkSize() {
864 actorContext.setCommitIndex(commitIndex);
866 leader = new Leader(actorContext);
868 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
869 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
871 Map<String, String> leadersSnapshot = new HashMap<>();
872 leadersSnapshot.put("1", "A");
873 leadersSnapshot.put("2", "B");
874 leadersSnapshot.put("3", "C");
876 // set the snapshot variables in replicatedlog
877 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
878 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
879 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
881 ByteString bs = toByteString(leadersSnapshot);
882 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
883 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
884 leader.setSnapshot(snapshot);
886 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
887 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
889 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
891 assertEquals(1, installSnapshot.getChunkIndex());
892 assertEquals(3, installSnapshot.getTotalChunks());
894 followerActor.underlyingActor().clear();
896 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
897 FOLLOWER_ID, -1, false));
899 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
900 TimeUnit.MILLISECONDS);
902 leader.handleMessage(leaderActor, new SendHeartBeat());
904 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
906 assertEquals(1, installSnapshot.getChunkIndex());
907 assertEquals(3, installSnapshot.getTotalChunks());
911 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
912 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
914 MockRaftActorContext actorContext = createActorContextWithFollower();
916 final int commitIndex = 3;
917 final int snapshotIndex = 2;
918 final int snapshotTerm = 1;
919 final int currentTerm = 2;
921 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
923 public int getSnapshotChunkSize() {
928 actorContext.setCommitIndex(commitIndex);
930 leader = new Leader(actorContext);
932 leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
933 leader.getFollower(FOLLOWER_ID).setNextIndex(0);
935 Map<String, String> leadersSnapshot = new HashMap<>();
936 leadersSnapshot.put("1", "A");
937 leadersSnapshot.put("2", "B");
938 leadersSnapshot.put("3", "C");
940 // set the snapshot variables in replicatedlog
941 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
942 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
943 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
945 ByteString bs = toByteString(leadersSnapshot);
946 Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
947 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
948 leader.setSnapshot(snapshot);
950 leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
952 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
954 assertEquals(1, installSnapshot.getChunkIndex());
955 assertEquals(3, installSnapshot.getTotalChunks());
956 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
958 int hashCode = installSnapshot.getData().hashCode();
960 followerActor.underlyingActor().clear();
962 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
963 FOLLOWER_ID, 1, true));
965 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
967 assertEquals(2, installSnapshot.getChunkIndex());
968 assertEquals(3, installSnapshot.getTotalChunks());
969 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
973 public void testFollowerToSnapshotLogic() {
974 logStart("testFollowerToSnapshotLogic");
976 MockRaftActorContext actorContext = createActorContext();
978 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
980 public int getSnapshotChunkSize() {
985 leader = new Leader(actorContext);
987 Map<String, String> leadersSnapshot = new HashMap<>();
988 leadersSnapshot.put("1", "A");
989 leadersSnapshot.put("2", "B");
990 leadersSnapshot.put("3", "C");
992 ByteString bs = toByteString(leadersSnapshot);
993 byte[] barray = bs.toByteArray();
995 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
996 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
998 assertEquals(bs.size(), barray.length);
1001 for (int i=0; i < barray.length; i = i + 50) {
1005 if (i + 50 > barray.length) {
1009 ByteString chunk = fts.getNextChunk();
1010 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1011 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1013 fts.markSendStatus(true);
1014 if (!fts.isLastChunk(chunkIndex)) {
1015 fts.incrementChunkIndex();
1019 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1022 @Override protected RaftActorBehavior createBehavior(
1023 RaftActorContext actorContext) {
1024 return new Leader(actorContext);
1028 protected MockRaftActorContext createActorContext() {
1029 return createActorContext(leaderActor);
1033 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1034 return createActorContext(LEADER_ID, actorRef);
1037 private MockRaftActorContext createActorContextWithFollower() {
1038 MockRaftActorContext actorContext = createActorContext();
1039 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1040 followerActor.path().toString()).build());
1041 return actorContext;
1044 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1045 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1046 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1047 configParams.setElectionTimeoutFactor(100000);
1048 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1049 context.setConfigParams(configParams);
1050 context.setPayloadVersion(payloadVersion);
1054 private MockRaftActorContext createFollowerActorContextWithLeader() {
1055 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1056 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1057 followerConfig.setElectionTimeoutFactor(10000);
1058 followerActorContext.setConfigParams(followerConfig);
1059 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1060 return followerActorContext;
1064 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1065 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1067 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1069 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1071 Follower follower = new Follower(followerActorContext);
1072 followerActor.underlyingActor().setBehavior(follower);
1074 Map<String, String> peerAddresses = new HashMap<>();
1075 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1077 leaderActorContext.setPeerAddresses(peerAddresses);
1079 leaderActorContext.getReplicatedLog().removeFrom(0);
1082 leaderActorContext.setReplicatedLog(
1083 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1085 leaderActorContext.setCommitIndex(1);
1087 followerActorContext.getReplicatedLog().removeFrom(0);
1089 // follower too has the exact same log entries and has the same commit index
1090 followerActorContext.setReplicatedLog(
1091 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1093 followerActorContext.setCommitIndex(1);
1095 leader = new Leader(leaderActorContext);
1097 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1099 assertEquals(1, appendEntries.getLeaderCommit());
1100 assertEquals(0, appendEntries.getEntries().size());
1101 assertEquals(0, appendEntries.getPrevLogIndex());
1103 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1104 leaderActor, AppendEntriesReply.class);
1106 assertEquals(2, appendEntriesReply.getLogLastIndex());
1107 assertEquals(1, appendEntriesReply.getLogLastTerm());
1109 // follower returns its next index
1110 assertEquals(2, appendEntriesReply.getLogLastIndex());
1111 assertEquals(1, appendEntriesReply.getLogLastTerm());
1117 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1118 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1120 MockRaftActorContext leaderActorContext = createActorContext();
1122 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1123 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1125 Follower follower = new Follower(followerActorContext);
1126 followerActor.underlyingActor().setBehavior(follower);
1128 Map<String, String> leaderPeerAddresses = new HashMap<>();
1129 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1131 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1133 leaderActorContext.getReplicatedLog().removeFrom(0);
1135 leaderActorContext.setReplicatedLog(
1136 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1138 leaderActorContext.setCommitIndex(1);
1140 followerActorContext.getReplicatedLog().removeFrom(0);
1142 followerActorContext.setReplicatedLog(
1143 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1145 // follower has the same log entries but its commit index > leaders commit index
1146 followerActorContext.setCommitIndex(2);
1148 leader = new Leader(leaderActorContext);
1150 // Initial heartbeat
1151 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1153 assertEquals(1, appendEntries.getLeaderCommit());
1154 assertEquals(0, appendEntries.getEntries().size());
1155 assertEquals(0, appendEntries.getPrevLogIndex());
1157 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1158 leaderActor, AppendEntriesReply.class);
1160 assertEquals(2, appendEntriesReply.getLogLastIndex());
1161 assertEquals(1, appendEntriesReply.getLogLastTerm());
1163 leaderActor.underlyingActor().setBehavior(follower);
1164 leader.handleMessage(followerActor, appendEntriesReply);
1166 leaderActor.underlyingActor().clear();
1167 followerActor.underlyingActor().clear();
1169 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1170 TimeUnit.MILLISECONDS);
1172 leader.handleMessage(leaderActor, new SendHeartBeat());
1174 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1176 assertEquals(2, appendEntries.getLeaderCommit());
1177 assertEquals(0, appendEntries.getEntries().size());
1178 assertEquals(2, appendEntries.getPrevLogIndex());
1180 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1182 assertEquals(2, appendEntriesReply.getLogLastIndex());
1183 assertEquals(1, appendEntriesReply.getLogLastTerm());
1185 assertEquals(2, followerActorContext.getCommitIndex());
1191 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1192 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1194 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1195 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1196 new FiniteDuration(1000, TimeUnit.SECONDS));
1198 leaderActorContext.setReplicatedLog(
1199 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1200 long leaderCommitIndex = 2;
1201 leaderActorContext.setCommitIndex(leaderCommitIndex);
1202 leaderActorContext.setLastApplied(leaderCommitIndex);
1204 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1205 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1207 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1209 followerActorContext.setReplicatedLog(
1210 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1211 followerActorContext.setCommitIndex(0);
1212 followerActorContext.setLastApplied(0);
1214 Follower follower = new Follower(followerActorContext);
1215 followerActor.underlyingActor().setBehavior(follower);
1217 leader = new Leader(leaderActorContext);
1219 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1220 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1222 MessageCollectorActor.clearMessages(followerActor);
1223 MessageCollectorActor.clearMessages(leaderActor);
1225 // Verify initial AppendEntries sent with the leader's current commit index.
1226 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1227 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1228 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1230 leaderActor.underlyingActor().setBehavior(leader);
1232 leader.handleMessage(followerActor, appendEntriesReply);
1234 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1235 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1237 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1238 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1239 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1241 assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1242 assertEquals("First entry data", leadersSecondLogEntry.getData(),
1243 appendEntries.getEntries().get(0).getData());
1244 assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1245 assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1246 appendEntries.getEntries().get(1).getData());
1248 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1249 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1251 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1253 ApplyState applyState = applyStateList.get(0);
1254 assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1255 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1256 assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1257 applyState.getReplicatedLogEntry().getData());
1259 applyState = applyStateList.get(1);
1260 assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1261 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1262 assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1263 applyState.getReplicatedLogEntry().getData());
1265 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1266 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1270 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1271 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1273 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1274 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1275 new FiniteDuration(1000, TimeUnit.SECONDS));
1277 leaderActorContext.setReplicatedLog(
1278 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1279 long leaderCommitIndex = 1;
1280 leaderActorContext.setCommitIndex(leaderCommitIndex);
1281 leaderActorContext.setLastApplied(leaderCommitIndex);
1283 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1284 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1286 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1288 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1289 followerActorContext.setCommitIndex(-1);
1290 followerActorContext.setLastApplied(-1);
1292 Follower follower = new Follower(followerActorContext);
1293 followerActor.underlyingActor().setBehavior(follower);
1295 leader = new Leader(leaderActorContext);
1297 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1298 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1300 MessageCollectorActor.clearMessages(followerActor);
1301 MessageCollectorActor.clearMessages(leaderActor);
1303 // Verify initial AppendEntries sent with the leader's current commit index.
1304 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1305 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1306 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1308 leaderActor.underlyingActor().setBehavior(leader);
1310 leader.handleMessage(followerActor, appendEntriesReply);
1312 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1313 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1315 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1316 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1317 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1319 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1320 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1321 appendEntries.getEntries().get(0).getData());
1322 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1323 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1324 appendEntries.getEntries().get(1).getData());
1326 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1327 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1329 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1331 ApplyState applyState = applyStateList.get(0);
1332 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1333 assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1334 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1335 applyState.getReplicatedLogEntry().getData());
1337 applyState = applyStateList.get(1);
1338 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1339 assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1340 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1341 applyState.getReplicatedLogEntry().getData());
1343 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1344 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1348 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1349 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1351 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1352 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1353 new FiniteDuration(1000, TimeUnit.SECONDS));
1355 leaderActorContext.setReplicatedLog(
1356 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1357 long leaderCommitIndex = 1;
1358 leaderActorContext.setCommitIndex(leaderCommitIndex);
1359 leaderActorContext.setLastApplied(leaderCommitIndex);
1361 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1362 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1364 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1366 followerActorContext.setReplicatedLog(
1367 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1368 followerActorContext.setCommitIndex(-1);
1369 followerActorContext.setLastApplied(-1);
1371 Follower follower = new Follower(followerActorContext);
1372 followerActor.underlyingActor().setBehavior(follower);
1374 leader = new Leader(leaderActorContext);
1376 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1377 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1379 MessageCollectorActor.clearMessages(followerActor);
1380 MessageCollectorActor.clearMessages(leaderActor);
1382 // Verify initial AppendEntries sent with the leader's current commit index.
1383 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1384 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1385 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1387 leaderActor.underlyingActor().setBehavior(leader);
1389 leader.handleMessage(followerActor, appendEntriesReply);
1391 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1392 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1394 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1395 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1396 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1398 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1399 assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1400 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1401 appendEntries.getEntries().get(0).getData());
1402 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1403 assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1404 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1405 appendEntries.getEntries().get(1).getData());
1407 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1408 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1410 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1412 ApplyState applyState = applyStateList.get(0);
1413 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1414 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1415 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1416 applyState.getReplicatedLogEntry().getData());
1418 applyState = applyStateList.get(1);
1419 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1420 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1421 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1422 applyState.getReplicatedLogEntry().getData());
1424 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1425 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1426 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1430 public void testHandleAppendEntriesReplyWithNewerTerm(){
1431 logStart("testHandleAppendEntriesReplyWithNewerTerm");
1433 MockRaftActorContext leaderActorContext = createActorContext();
1434 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435 new FiniteDuration(10000, TimeUnit.SECONDS));
1437 leaderActorContext.setReplicatedLog(
1438 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1440 leader = new Leader(leaderActorContext);
1441 leaderActor.underlyingActor().setBehavior(leader);
1442 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1444 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1446 assertEquals(false, appendEntriesReply.isSuccess());
1447 assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1449 MessageCollectorActor.clearMessages(leaderActor);
1453 public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1454 logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1456 MockRaftActorContext leaderActorContext = createActorContext();
1457 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1458 new FiniteDuration(10000, TimeUnit.SECONDS));
1460 leaderActorContext.setReplicatedLog(
1461 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1462 leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1464 leader = new Leader(leaderActorContext);
1465 leaderActor.underlyingActor().setBehavior(leader);
1466 leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1468 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1470 assertEquals(false, appendEntriesReply.isSuccess());
1471 assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1473 MessageCollectorActor.clearMessages(leaderActor);
1477 public void testHandleAppendEntriesReplySuccess() throws Exception {
1478 logStart("testHandleAppendEntriesReplySuccess");
1480 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1482 leaderActorContext.setReplicatedLog(
1483 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1485 leaderActorContext.setCommitIndex(1);
1486 leaderActorContext.setLastApplied(1);
1487 leaderActorContext.getTermInformation().update(1, "leader");
1489 leader = new Leader(leaderActorContext);
1491 assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1493 short payloadVersion = 5;
1494 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1496 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1498 assertEquals(RaftState.Leader, raftActorBehavior.state());
1500 assertEquals(2, leaderActorContext.getCommitIndex());
1502 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1503 leaderActor, ApplyJournalEntries.class);
1505 assertEquals(2, leaderActorContext.getLastApplied());
1507 assertEquals(2, applyJournalEntries.getToIndex());
1509 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1512 assertEquals(1,applyStateList.size());
1514 ApplyState applyState = applyStateList.get(0);
1516 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1518 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1519 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1523 public void testHandleAppendEntriesReplyUnknownFollower(){
1524 logStart("testHandleAppendEntriesReplyUnknownFollower");
1526 MockRaftActorContext leaderActorContext = createActorContext();
1528 leader = new Leader(leaderActorContext);
1530 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1532 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1534 assertEquals(RaftState.Leader, raftActorBehavior.state());
1538 public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1539 logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1541 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1542 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1543 new FiniteDuration(1000, TimeUnit.SECONDS));
1544 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1546 leaderActorContext.setReplicatedLog(
1547 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1548 long leaderCommitIndex = 3;
1549 leaderActorContext.setCommitIndex(leaderCommitIndex);
1550 leaderActorContext.setLastApplied(leaderCommitIndex);
1552 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1553 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1554 ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1555 ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1557 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1559 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1560 followerActorContext.setCommitIndex(-1);
1561 followerActorContext.setLastApplied(-1);
1563 Follower follower = new Follower(followerActorContext);
1564 followerActor.underlyingActor().setBehavior(follower);
1566 leader = new Leader(leaderActorContext);
1568 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1569 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1571 MessageCollectorActor.clearMessages(followerActor);
1572 MessageCollectorActor.clearMessages(leaderActor);
1574 // Verify initial AppendEntries sent with the leader's current commit index.
1575 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1576 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1577 assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1579 leaderActor.underlyingActor().setBehavior(leader);
1581 leader.handleMessage(followerActor, appendEntriesReply);
1583 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1584 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1586 appendEntries = appendEntriesList.get(0);
1587 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1588 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1589 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1591 assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1592 assertEquals("First entry data", leadersFirstLogEntry.getData(),
1593 appendEntries.getEntries().get(0).getData());
1594 assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1595 assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1596 appendEntries.getEntries().get(1).getData());
1598 appendEntries = appendEntriesList.get(1);
1599 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1600 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1601 assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1603 assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1604 assertEquals("First entry data", leadersThirdLogEntry.getData(),
1605 appendEntries.getEntries().get(0).getData());
1606 assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1607 assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1608 appendEntries.getEntries().get(1).getData());
1610 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1611 assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1613 MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1615 assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1616 assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1620 public void testHandleRequestVoteReply(){
1621 logStart("testHandleRequestVoteReply");
1623 MockRaftActorContext leaderActorContext = createActorContext();
1625 leader = new Leader(leaderActorContext);
1627 // Should be a no-op.
1628 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1629 new RequestVoteReply(1, true));
1631 assertEquals(RaftState.Leader, raftActorBehavior.state());
1633 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1635 assertEquals(RaftState.Leader, raftActorBehavior.state());
1639 public void testIsolatedLeaderCheckNoFollowers() {
1640 logStart("testIsolatedLeaderCheckNoFollowers");
1642 MockRaftActorContext leaderActorContext = createActorContext();
1644 leader = new Leader(leaderActorContext);
1645 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1646 Assert.assertTrue(behavior instanceof Leader);
1649 private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1650 ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1651 ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1653 MockRaftActorContext leaderActorContext = createActorContext();
1655 Map<String, String> peerAddresses = new HashMap<>();
1656 peerAddresses.put("follower-1", followerActor1.path().toString());
1657 peerAddresses.put("follower-2", followerActor2.path().toString());
1659 leaderActorContext.setPeerAddresses(peerAddresses);
1660 leaderActorContext.setRaftPolicy(raftPolicy);
1662 leader = new Leader(leaderActorContext);
1664 leader.markFollowerActive("follower-1");
1665 leader.markFollowerActive("follower-2");
1666 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1667 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1668 behavior instanceof Leader);
1670 // kill 1 follower and verify if that got killed
1671 final JavaTestKit probe = new JavaTestKit(getSystem());
1672 probe.watch(followerActor1);
1673 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1674 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1675 assertEquals(termMsg1.getActor(), followerActor1);
1677 leader.markFollowerInActive("follower-1");
1678 leader.markFollowerActive("follower-2");
1679 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1680 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1681 behavior instanceof Leader);
1683 // kill 2nd follower and leader should change to Isolated leader
1684 followerActor2.tell(PoisonPill.getInstance(), null);
1685 probe.watch(followerActor2);
1686 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1687 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1688 assertEquals(termMsg2.getActor(), followerActor2);
1690 leader.markFollowerInActive("follower-2");
1691 return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1695 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1696 logStart("testIsolatedLeaderCheckTwoFollowers");
1698 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1700 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1701 behavior instanceof IsolatedLeader);
1705 public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1706 logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1708 RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1710 Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1711 behavior instanceof Leader);
1715 public void testLaggingFollowerStarvation() throws Exception {
1716 logStart("testLaggingFollowerStarvation");
1717 new JavaTestKit(getSystem()) {{
1718 String leaderActorId = actorFactory.generateActorId("leader");
1719 String follower1ActorId = actorFactory.generateActorId("follower");
1720 String follower2ActorId = actorFactory.generateActorId("follower");
1722 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1723 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1724 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1725 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1727 MockRaftActorContext leaderActorContext =
1728 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1730 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1731 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1732 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1734 leaderActorContext.setConfigParams(configParams);
1736 leaderActorContext.setReplicatedLog(
1737 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1739 Map<String, String> peerAddresses = new HashMap<>();
1740 peerAddresses.put(follower1ActorId,
1741 follower1Actor.path().toString());
1742 peerAddresses.put(follower2ActorId,
1743 follower2Actor.path().toString());
1745 leaderActorContext.setPeerAddresses(peerAddresses);
1746 leaderActorContext.getTermInformation().update(1, leaderActorId);
1748 RaftActorBehavior leader = createBehavior(leaderActorContext);
1750 leaderActor.underlyingActor().setBehavior(leader);
1752 for(int i=1;i<6;i++) {
1753 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1754 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1755 assertTrue(newBehavior == leader);
1756 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1759 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1760 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1762 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1763 heartbeats.size() > 1);
1765 // Check if follower-2 got AppendEntries during this time and was not starved
1766 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1768 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1769 appendEntries.size() > 1);
1775 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1776 ActorRef actorRef, RaftRPC rpc) throws Exception {
1777 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1778 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1781 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1783 private final long electionTimeOutIntervalMillis;
1784 private final int snapshotChunkSize;
1786 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1788 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1789 this.snapshotChunkSize = snapshotChunkSize;
1793 public FiniteDuration getElectionTimeOutInterval() {
1794 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1798 public int getSnapshotChunkSize() {
1799 return snapshotChunkSize;