1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
48 public class LeaderTest extends AbstractLeaderTest {
50 static final String FOLLOWER_ID = "follower";
52 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58 private Leader leader;
62 public void tearDown() throws Exception {
71 public void testHandleMessageForUnknownMessage() throws Exception {
72 logStart("testHandleMessageForUnknownMessage");
74 leader = new Leader(createActorContext());
76 // handle message should return the Leader state when it receives an
78 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79 Assert.assertTrue(behavior instanceof Leader);
83 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86 MockRaftActorContext actorContext = createActorContextWithFollower();
89 actorContext.getTermInformation().update(term, "");
91 leader = new Leader(actorContext);
93 // Leader should send an immediate heartbeat with no entries as follower is inactive.
94 long lastIndex = actorContext.getReplicatedLog().lastIndex();
95 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96 assertEquals("getTerm", term, appendEntries.getTerm());
97 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99 assertEquals("Entries size", 0, appendEntries.getEntries().size());
101 // The follower would normally reply - simulate that explicitly here.
102 leader.handleMessage(followerActor, new AppendEntriesReply(
103 FOLLOWER_ID, term, true, lastIndex - 1, term));
104 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106 followerActor.underlyingActor().clear();
108 // Sleep for the heartbeat interval so AppendEntries is sent.
109 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112 leader.handleMessage(leaderActor, new SendHeartBeat());
114 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117 assertEquals("Entries size", 1, appendEntries.getEntries().size());
118 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
123 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
124 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
125 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
127 actorContext.getReplicatedLog().append(newEntry);
128 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
132 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
133 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135 MockRaftActorContext actorContext = createActorContextWithFollower();
138 actorContext.getTermInformation().update(term, "");
140 leader = new Leader(actorContext);
142 // Leader will send an immediate heartbeat - ignore it.
143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145 // The follower would normally reply - simulate that explicitly here.
146 long lastIndex = actorContext.getReplicatedLog().lastIndex();
147 leader.handleMessage(followerActor, new AppendEntriesReply(
148 FOLLOWER_ID, term, true, lastIndex, term));
149 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151 followerActor.underlyingActor().clear();
153 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155 1, lastIndex + 1, payload);
156 actorContext.getReplicatedLog().append(newEntry);
157 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
159 // State should not change
160 assertTrue(raftBehavior instanceof Leader);
162 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
164 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
165 assertEquals("Entries size", 1, appendEntries.getEntries().size());
166 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
167 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
168 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
172 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
173 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
175 MockRaftActorContext actorContext = createActorContextWithFollower();
176 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
178 public FiniteDuration getHeartBeatInterval() {
179 return FiniteDuration.apply(5, TimeUnit.SECONDS);
184 actorContext.getTermInformation().update(term, "");
186 leader = new Leader(actorContext);
188 // Leader will send an immediate heartbeat - ignore it.
189 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191 // The follower would normally reply - simulate that explicitly here.
192 long lastIndex = actorContext.getReplicatedLog().lastIndex();
193 leader.handleMessage(followerActor, new AppendEntriesReply(
194 FOLLOWER_ID, term, true, lastIndex, term));
195 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197 followerActor.underlyingActor().clear();
199 for(int i=0;i<5;i++) {
200 sendReplicate(actorContext, lastIndex+i+1);
203 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
204 // We expect only 1 message to be sent because of two reasons,
205 // - an append entries reply was not received
206 // - the heartbeat interval has not expired
207 // In this scenario if multiple messages are sent they would likely be duplicates
208 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
212 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
213 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
215 MockRaftActorContext actorContext = createActorContextWithFollower();
216 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
218 public FiniteDuration getHeartBeatInterval() {
219 return FiniteDuration.apply(5, TimeUnit.SECONDS);
224 actorContext.getTermInformation().update(term, "");
226 leader = new Leader(actorContext);
228 // Leader will send an immediate heartbeat - ignore it.
229 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
231 // The follower would normally reply - simulate that explicitly here.
232 long lastIndex = actorContext.getReplicatedLog().lastIndex();
233 leader.handleMessage(followerActor, new AppendEntriesReply(
234 FOLLOWER_ID, term, true, lastIndex, term));
235 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
237 followerActor.underlyingActor().clear();
239 for(int i=0;i<3;i++) {
240 sendReplicate(actorContext, lastIndex+i+1);
241 leader.handleMessage(followerActor, new AppendEntriesReply(
242 FOLLOWER_ID, term, true, lastIndex + i + 1, term));
246 for(int i=3;i<5;i++) {
247 sendReplicate(actorContext, lastIndex + i + 1);
250 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
251 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
252 // get sent to the follower - but not the 5th
253 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
255 for(int i=0;i<4;i++) {
256 long expected = allMessages.get(i).getEntries().get(0).getIndex();
257 assertEquals(expected, i+2);
262 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
263 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
265 MockRaftActorContext actorContext = createActorContextWithFollower();
266 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268 public FiniteDuration getHeartBeatInterval() {
269 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
274 actorContext.getTermInformation().update(term, "");
276 leader = new Leader(actorContext);
278 // Leader will send an immediate heartbeat - ignore it.
279 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281 // The follower would normally reply - simulate that explicitly here.
282 long lastIndex = actorContext.getReplicatedLog().lastIndex();
283 leader.handleMessage(followerActor, new AppendEntriesReply(
284 FOLLOWER_ID, term, true, lastIndex, term));
285 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287 followerActor.underlyingActor().clear();
289 sendReplicate(actorContext, lastIndex+1);
291 // Wait slightly longer than heartbeat duration
292 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
294 leader.handleMessage(leaderActor, new SendHeartBeat());
296 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
297 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
299 assertEquals(1, allMessages.get(0).getEntries().size());
300 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
301 assertEquals(1, allMessages.get(1).getEntries().size());
302 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
307 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
308 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
310 MockRaftActorContext actorContext = createActorContextWithFollower();
311 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313 public FiniteDuration getHeartBeatInterval() {
314 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
319 actorContext.getTermInformation().update(term, "");
321 leader = new Leader(actorContext);
323 // Leader will send an immediate heartbeat - ignore it.
324 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326 // The follower would normally reply - simulate that explicitly here.
327 long lastIndex = actorContext.getReplicatedLog().lastIndex();
328 leader.handleMessage(followerActor, new AppendEntriesReply(
329 FOLLOWER_ID, term, true, lastIndex, term));
330 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332 followerActor.underlyingActor().clear();
334 for(int i=0;i<3;i++) {
335 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
336 leader.handleMessage(leaderActor, new SendHeartBeat());
339 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
340 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
344 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
345 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
347 MockRaftActorContext actorContext = createActorContextWithFollower();
348 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
350 public FiniteDuration getHeartBeatInterval() {
351 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
356 actorContext.getTermInformation().update(term, "");
358 leader = new Leader(actorContext);
360 // Leader will send an immediate heartbeat - ignore it.
361 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
363 // The follower would normally reply - simulate that explicitly here.
364 long lastIndex = actorContext.getReplicatedLog().lastIndex();
365 leader.handleMessage(followerActor, new AppendEntriesReply(
366 FOLLOWER_ID, term, true, lastIndex, term));
367 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
369 followerActor.underlyingActor().clear();
371 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
372 leader.handleMessage(leaderActor, new SendHeartBeat());
373 sendReplicate(actorContext, lastIndex+1);
375 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
376 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
378 assertEquals(0, allMessages.get(0).getEntries().size());
379 assertEquals(1, allMessages.get(1).getEntries().size());
384 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
385 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
387 MockRaftActorContext actorContext = createActorContext();
389 leader = new Leader(actorContext);
391 actorContext.setLastApplied(0);
393 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
394 long term = actorContext.getTermInformation().getCurrentTerm();
395 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
396 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
398 actorContext.getReplicatedLog().append(newEntry);
400 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
401 new Replicate(leaderActor, "state-id", newEntry));
403 // State should not change
404 assertTrue(raftBehavior instanceof Leader);
406 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
408 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
409 // one since lastApplied state is 0.
410 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
411 leaderActor, ApplyState.class);
412 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
414 for(int i = 0; i <= newLogIndex - 1; i++ ) {
415 ApplyState applyState = applyStateList.get(i);
416 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
417 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
420 ApplyState last = applyStateList.get((int) newLogIndex - 1);
421 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
422 assertEquals("getIdentifier", "state-id", last.getIdentifier());
426 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
427 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
429 MockRaftActorContext actorContext = createActorContextWithFollower();
431 Map<String, String> leadersSnapshot = new HashMap<>();
432 leadersSnapshot.put("1", "A");
433 leadersSnapshot.put("2", "B");
434 leadersSnapshot.put("3", "C");
437 actorContext.getReplicatedLog().removeFrom(0);
439 final int followersLastIndex = 2;
440 final int snapshotIndex = 3;
441 final int newEntryIndex = 4;
442 final int snapshotTerm = 1;
443 final int currentTerm = 2;
445 // set the snapshot variables in replicatedlog
446 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
447 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
448 actorContext.setCommitIndex(followersLastIndex);
449 //set follower timeout to 2 mins, helps during debugging
450 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
452 leader = new Leader(actorContext);
455 ReplicatedLogImplEntry entry =
456 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
457 new MockRaftActorContext.MockPayload("D"));
459 //update follower timestamp
460 leader.markFollowerActive(FOLLOWER_ID);
462 ByteString bs = toByteString(leadersSnapshot);
463 leader.setSnapshot(Optional.of(bs));
464 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
465 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
467 //send first chunk and no InstallSnapshotReply received yet
469 fts.incrementChunkIndex();
471 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
472 TimeUnit.MILLISECONDS);
474 leader.handleMessage(leaderActor, new SendHeartBeat());
476 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
480 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
482 //InstallSnapshotReply received
483 fts.markSendStatus(true);
485 leader.handleMessage(leaderActor, new SendHeartBeat());
487 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
489 assertEquals(snapshotIndex, is.getLastIncludedIndex());
493 public void testSendAppendEntriesSnapshotScenario() throws Exception {
494 logStart("testSendAppendEntriesSnapshotScenario");
496 MockRaftActorContext actorContext = createActorContextWithFollower();
498 Map<String, String> leadersSnapshot = new HashMap<>();
499 leadersSnapshot.put("1", "A");
500 leadersSnapshot.put("2", "B");
501 leadersSnapshot.put("3", "C");
504 actorContext.getReplicatedLog().removeFrom(0);
506 final int followersLastIndex = 2;
507 final int snapshotIndex = 3;
508 final int newEntryIndex = 4;
509 final int snapshotTerm = 1;
510 final int currentTerm = 2;
512 // set the snapshot variables in replicatedlog
513 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
514 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
515 actorContext.setCommitIndex(followersLastIndex);
517 leader = new Leader(actorContext);
519 // Leader will send an immediate heartbeat - ignore it.
520 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
523 ReplicatedLogImplEntry entry =
524 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
525 new MockRaftActorContext.MockPayload("D"));
527 actorContext.getReplicatedLog().append(entry);
529 //update follower timestamp
530 leader.markFollowerActive(FOLLOWER_ID);
532 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
533 RaftActorBehavior raftBehavior = leader.handleMessage(
534 leaderActor, new Replicate(null, "state-id", entry));
536 assertTrue(raftBehavior instanceof Leader);
538 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
542 public void testInitiateInstallSnapshot() throws Exception {
543 logStart("testInitiateInstallSnapshot");
545 MockRaftActorContext actorContext = createActorContextWithFollower();
547 Map<String, String> leadersSnapshot = new HashMap<>();
548 leadersSnapshot.put("1", "A");
549 leadersSnapshot.put("2", "B");
550 leadersSnapshot.put("3", "C");
553 actorContext.getReplicatedLog().removeFrom(0);
555 final int followersLastIndex = 2;
556 final int snapshotIndex = 3;
557 final int newEntryIndex = 4;
558 final int snapshotTerm = 1;
559 final int currentTerm = 2;
561 // set the snapshot variables in replicatedlog
562 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
563 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
564 actorContext.setLastApplied(3);
565 actorContext.setCommitIndex(followersLastIndex);
567 leader = new Leader(actorContext);
569 // Leader will send an immediate heartbeat - ignore it.
570 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
572 // set the snapshot as absent and check if capture-snapshot is invoked.
573 leader.setSnapshot(Optional.<ByteString>absent());
576 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577 new MockRaftActorContext.MockPayload("D"));
579 actorContext.getReplicatedLog().append(entry);
581 //update follower timestamp
582 leader.markFollowerActive(FOLLOWER_ID);
584 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
586 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
588 assertTrue(cs.isInstallSnapshotInitiated());
589 assertEquals(3, cs.getLastAppliedIndex());
590 assertEquals(1, cs.getLastAppliedTerm());
591 assertEquals(4, cs.getLastIndex());
592 assertEquals(2, cs.getLastTerm());
594 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
595 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
597 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
598 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
602 public void testInstallSnapshot() throws Exception {
603 logStart("testInstallSnapshot");
605 MockRaftActorContext actorContext = createActorContextWithFollower();
607 Map<String, String> leadersSnapshot = new HashMap<>();
608 leadersSnapshot.put("1", "A");
609 leadersSnapshot.put("2", "B");
610 leadersSnapshot.put("3", "C");
613 actorContext.getReplicatedLog().removeFrom(0);
615 final int followersLastIndex = 2;
616 final int snapshotIndex = 3;
617 final int snapshotTerm = 1;
618 final int currentTerm = 2;
620 // set the snapshot variables in replicatedlog
621 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
622 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
623 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
624 actorContext.setCommitIndex(followersLastIndex);
626 leader = new Leader(actorContext);
628 // Ignore initial heartbeat.
629 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
631 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
632 new SendInstallSnapshot(toByteString(leadersSnapshot)));
634 assertTrue(raftBehavior instanceof Leader);
636 // check if installsnapshot gets called with the correct values.
638 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
640 assertNotNull(installSnapshot.getData());
641 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
642 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
644 assertEquals(currentTerm, installSnapshot.getTerm());
648 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
649 logStart("testHandleInstallSnapshotReplyLastChunk");
651 MockRaftActorContext actorContext = createActorContextWithFollower();
653 final int followersLastIndex = 2;
654 final int snapshotIndex = 3;
655 final int snapshotTerm = 1;
656 final int currentTerm = 2;
658 actorContext.setCommitIndex(followersLastIndex);
660 leader = new Leader(actorContext);
662 // Ignore initial heartbeat.
663 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
665 Map<String, String> leadersSnapshot = new HashMap<>();
666 leadersSnapshot.put("1", "A");
667 leadersSnapshot.put("2", "B");
668 leadersSnapshot.put("3", "C");
670 // set the snapshot variables in replicatedlog
672 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
673 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
674 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
676 ByteString bs = toByteString(leadersSnapshot);
677 leader.setSnapshot(Optional.of(bs));
678 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
679 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
680 while(!fts.isLastChunk(fts.getChunkIndex())) {
682 fts.incrementChunkIndex();
686 actorContext.getReplicatedLog().removeFrom(0);
688 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
689 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
691 assertTrue(raftBehavior instanceof Leader);
693 assertEquals(0, leader.followerSnapshotSize());
694 assertEquals(1, leader.followerLogSize());
695 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
697 assertEquals(snapshotIndex, fli.getMatchIndex());
698 assertEquals(snapshotIndex, fli.getMatchIndex());
699 assertEquals(snapshotIndex + 1, fli.getNextIndex());
703 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
704 logStart("testSendSnapshotfromInstallSnapshotReply");
706 MockRaftActorContext actorContext = createActorContextWithFollower();
708 final int followersLastIndex = 2;
709 final int snapshotIndex = 3;
710 final int snapshotTerm = 1;
711 final int currentTerm = 2;
713 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
715 public int getSnapshotChunkSize() {
719 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
720 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
722 actorContext.setConfigParams(configParams);
723 actorContext.setCommitIndex(followersLastIndex);
725 leader = new Leader(actorContext);
727 Map<String, String> leadersSnapshot = new HashMap<>();
728 leadersSnapshot.put("1", "A");
729 leadersSnapshot.put("2", "B");
730 leadersSnapshot.put("3", "C");
732 // set the snapshot variables in replicatedlog
733 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
737 ByteString bs = toByteString(leadersSnapshot);
738 leader.setSnapshot(Optional.of(bs));
740 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
742 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
744 assertEquals(1, installSnapshot.getChunkIndex());
745 assertEquals(3, installSnapshot.getTotalChunks());
747 followerActor.underlyingActor().clear();
748 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
749 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
751 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
753 assertEquals(2, installSnapshot.getChunkIndex());
754 assertEquals(3, installSnapshot.getTotalChunks());
756 followerActor.underlyingActor().clear();
757 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
758 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
760 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
762 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
763 followerActor.underlyingActor().clear();
764 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
765 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
767 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
769 Assert.assertNull(installSnapshot);
774 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
775 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
777 MockRaftActorContext actorContext = createActorContextWithFollower();
779 final int followersLastIndex = 2;
780 final int snapshotIndex = 3;
781 final int snapshotTerm = 1;
782 final int currentTerm = 2;
784 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
786 public int getSnapshotChunkSize() {
791 actorContext.setCommitIndex(followersLastIndex);
793 leader = new Leader(actorContext);
795 Map<String, String> leadersSnapshot = new HashMap<>();
796 leadersSnapshot.put("1", "A");
797 leadersSnapshot.put("2", "B");
798 leadersSnapshot.put("3", "C");
800 // set the snapshot variables in replicatedlog
801 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805 ByteString bs = toByteString(leadersSnapshot);
806 leader.setSnapshot(Optional.of(bs));
808 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
809 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
811 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
813 assertEquals(1, installSnapshot.getChunkIndex());
814 assertEquals(3, installSnapshot.getTotalChunks());
816 followerActor.underlyingActor().clear();
818 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
819 FOLLOWER_ID, -1, false));
821 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
822 TimeUnit.MILLISECONDS);
824 leader.handleMessage(leaderActor, new SendHeartBeat());
826 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828 assertEquals(1, installSnapshot.getChunkIndex());
829 assertEquals(3, installSnapshot.getTotalChunks());
833 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
834 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
836 MockRaftActorContext actorContext = createActorContextWithFollower();
838 final int followersLastIndex = 2;
839 final int snapshotIndex = 3;
840 final int snapshotTerm = 1;
841 final int currentTerm = 2;
843 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
845 public int getSnapshotChunkSize() {
850 actorContext.setCommitIndex(followersLastIndex);
852 leader = new Leader(actorContext);
854 Map<String, String> leadersSnapshot = new HashMap<>();
855 leadersSnapshot.put("1", "A");
856 leadersSnapshot.put("2", "B");
857 leadersSnapshot.put("3", "C");
859 // set the snapshot variables in replicatedlog
860 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
861 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
862 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864 ByteString bs = toByteString(leadersSnapshot);
865 leader.setSnapshot(Optional.of(bs));
867 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
869 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
871 assertEquals(1, installSnapshot.getChunkIndex());
872 assertEquals(3, installSnapshot.getTotalChunks());
873 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
875 int hashCode = installSnapshot.getData().hashCode();
877 followerActor.underlyingActor().clear();
879 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
880 FOLLOWER_ID, 1, true));
882 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
884 assertEquals(2, installSnapshot.getChunkIndex());
885 assertEquals(3, installSnapshot.getTotalChunks());
886 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
890 public void testFollowerToSnapshotLogic() {
891 logStart("testFollowerToSnapshotLogic");
893 MockRaftActorContext actorContext = createActorContext();
895 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
897 public int getSnapshotChunkSize() {
902 leader = new Leader(actorContext);
904 Map<String, String> leadersSnapshot = new HashMap<>();
905 leadersSnapshot.put("1", "A");
906 leadersSnapshot.put("2", "B");
907 leadersSnapshot.put("3", "C");
909 ByteString bs = toByteString(leadersSnapshot);
910 byte[] barray = bs.toByteArray();
912 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
913 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
915 assertEquals(bs.size(), barray.length);
918 for (int i=0; i < barray.length; i = i + 50) {
922 if (i + 50 > barray.length) {
926 ByteString chunk = fts.getNextChunk();
927 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
928 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
930 fts.markSendStatus(true);
931 if (!fts.isLastChunk(chunkIndex)) {
932 fts.incrementChunkIndex();
936 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
939 @Override protected RaftActorBehavior createBehavior(
940 RaftActorContext actorContext) {
941 return new Leader(actorContext);
945 protected MockRaftActorContext createActorContext() {
946 return createActorContext(leaderActor);
950 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
951 return createActorContext("leader", actorRef);
954 private MockRaftActorContext createActorContextWithFollower() {
955 MockRaftActorContext actorContext = createActorContext();
956 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
957 followerActor.path().toString()).build());
961 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
962 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
963 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
964 configParams.setElectionTimeoutFactor(100000);
965 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
966 context.setConfigParams(configParams);
971 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
972 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
974 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
976 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
978 Follower follower = new Follower(followerActorContext);
979 followerActor.underlyingActor().setBehavior(follower);
981 Map<String, String> peerAddresses = new HashMap<>();
982 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
984 leaderActorContext.setPeerAddresses(peerAddresses);
986 leaderActorContext.getReplicatedLog().removeFrom(0);
989 leaderActorContext.setReplicatedLog(
990 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
992 leaderActorContext.setCommitIndex(1);
994 followerActorContext.getReplicatedLog().removeFrom(0);
996 // follower too has the exact same log entries and has the same commit index
997 followerActorContext.setReplicatedLog(
998 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1000 followerActorContext.setCommitIndex(1);
1002 leader = new Leader(leaderActorContext);
1004 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1006 assertEquals(1, appendEntries.getLeaderCommit());
1007 assertEquals(0, appendEntries.getEntries().size());
1008 assertEquals(0, appendEntries.getPrevLogIndex());
1010 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1011 leaderActor, AppendEntriesReply.class);
1013 assertEquals(2, appendEntriesReply.getLogLastIndex());
1014 assertEquals(1, appendEntriesReply.getLogLastTerm());
1016 // follower returns its next index
1017 assertEquals(2, appendEntriesReply.getLogLastIndex());
1018 assertEquals(1, appendEntriesReply.getLogLastTerm());
1024 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1025 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1027 MockRaftActorContext leaderActorContext = createActorContext();
1029 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1031 Follower follower = new Follower(followerActorContext);
1032 followerActor.underlyingActor().setBehavior(follower);
1034 Map<String, String> peerAddresses = new HashMap<>();
1035 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1037 leaderActorContext.setPeerAddresses(peerAddresses);
1039 leaderActorContext.getReplicatedLog().removeFrom(0);
1041 leaderActorContext.setReplicatedLog(
1042 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1044 leaderActorContext.setCommitIndex(1);
1046 followerActorContext.getReplicatedLog().removeFrom(0);
1048 followerActorContext.setReplicatedLog(
1049 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1051 // follower has the same log entries but its commit index > leaders commit index
1052 followerActorContext.setCommitIndex(2);
1054 leader = new Leader(leaderActorContext);
1056 // Initial heartbeat
1057 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1059 assertEquals(1, appendEntries.getLeaderCommit());
1060 assertEquals(0, appendEntries.getEntries().size());
1061 assertEquals(0, appendEntries.getPrevLogIndex());
1063 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1064 leaderActor, AppendEntriesReply.class);
1066 assertEquals(2, appendEntriesReply.getLogLastIndex());
1067 assertEquals(1, appendEntriesReply.getLogLastTerm());
1069 leaderActor.underlyingActor().setBehavior(follower);
1070 leader.handleMessage(followerActor, appendEntriesReply);
1072 leaderActor.underlyingActor().clear();
1073 followerActor.underlyingActor().clear();
1075 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076 TimeUnit.MILLISECONDS);
1078 leader.handleMessage(leaderActor, new SendHeartBeat());
1080 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1082 assertEquals(2, appendEntries.getLeaderCommit());
1083 assertEquals(0, appendEntries.getEntries().size());
1084 assertEquals(2, appendEntries.getPrevLogIndex());
1086 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1088 assertEquals(2, appendEntriesReply.getLogLastIndex());
1089 assertEquals(1, appendEntriesReply.getLogLastTerm());
1091 assertEquals(2, followerActorContext.getCommitIndex());
1097 public void testHandleAppendEntriesReplyFailure(){
1098 logStart("testHandleAppendEntriesReplyFailure");
1100 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1102 leader = new Leader(leaderActorContext);
1104 // Send initial heartbeat reply with last index.
1105 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1107 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1108 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1110 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1112 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1114 assertEquals(RaftState.Leader, raftActorBehavior.state());
1116 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1120 public void testHandleAppendEntriesReplySuccess() throws Exception {
1121 logStart("testHandleAppendEntriesReplySuccess");
1123 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1125 leaderActorContext.setReplicatedLog(
1126 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1128 leaderActorContext.setCommitIndex(1);
1129 leaderActorContext.setLastApplied(1);
1130 leaderActorContext.getTermInformation().update(1, "leader");
1132 leader = new Leader(leaderActorContext);
1134 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1136 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1138 assertEquals(RaftState.Leader, raftActorBehavior.state());
1140 assertEquals(2, leaderActorContext.getCommitIndex());
1142 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1143 leaderActor, ApplyJournalEntries.class);
1145 assertEquals(2, leaderActorContext.getLastApplied());
1147 assertEquals(2, applyJournalEntries.getToIndex());
1149 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1152 assertEquals(1,applyStateList.size());
1154 ApplyState applyState = applyStateList.get(0);
1156 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1160 public void testHandleAppendEntriesReplyUnknownFollower(){
1161 logStart("testHandleAppendEntriesReplyUnknownFollower");
1163 MockRaftActorContext leaderActorContext = createActorContext();
1165 leader = new Leader(leaderActorContext);
1167 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1169 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1171 assertEquals(RaftState.Leader, raftActorBehavior.state());
1175 public void testHandleRequestVoteReply(){
1176 logStart("testHandleRequestVoteReply");
1178 MockRaftActorContext leaderActorContext = createActorContext();
1180 leader = new Leader(leaderActorContext);
1182 // Should be a no-op.
1183 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1184 new RequestVoteReply(1, true));
1186 assertEquals(RaftState.Leader, raftActorBehavior.state());
1188 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1190 assertEquals(RaftState.Leader, raftActorBehavior.state());
1194 public void testIsolatedLeaderCheckNoFollowers() {
1195 logStart("testIsolatedLeaderCheckNoFollowers");
1197 MockRaftActorContext leaderActorContext = createActorContext();
1199 leader = new Leader(leaderActorContext);
1200 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1201 Assert.assertTrue(behavior instanceof Leader);
1205 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1206 logStart("testIsolatedLeaderCheckTwoFollowers");
1208 new JavaTestKit(getSystem()) {{
1210 ActorRef followerActor1 = getTestActor();
1211 ActorRef followerActor2 = getTestActor();
1213 MockRaftActorContext leaderActorContext = createActorContext();
1215 Map<String, String> peerAddresses = new HashMap<>();
1216 peerAddresses.put("follower-1", followerActor1.path().toString());
1217 peerAddresses.put("follower-2", followerActor2.path().toString());
1219 leaderActorContext.setPeerAddresses(peerAddresses);
1221 leader = new Leader(leaderActorContext);
1223 leader.markFollowerActive("follower-1");
1224 leader.markFollowerActive("follower-2");
1225 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1226 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1227 behavior instanceof Leader);
1229 // kill 1 follower and verify if that got killed
1230 final JavaTestKit probe = new JavaTestKit(getSystem());
1231 probe.watch(followerActor1);
1232 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1233 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1234 assertEquals(termMsg1.getActor(), followerActor1);
1236 leader.markFollowerInActive("follower-1");
1237 leader.markFollowerActive("follower-2");
1238 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1239 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1240 behavior instanceof Leader);
1242 // kill 2nd follower and leader should change to Isolated leader
1243 followerActor2.tell(PoisonPill.getInstance(), null);
1244 probe.watch(followerActor2);
1245 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1247 assertEquals(termMsg2.getActor(), followerActor2);
1249 leader.markFollowerInActive("follower-2");
1250 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1251 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1252 behavior instanceof IsolatedLeader);
1258 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1259 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1261 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1263 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1264 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1265 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1267 leaderActorContext.setConfigParams(configParams);
1269 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1271 followerActorContext.setConfigParams(configParams);
1273 Follower follower = new Follower(followerActorContext);
1274 followerActor.underlyingActor().setBehavior(follower);
1276 leaderActorContext.getReplicatedLog().removeFrom(0);
1277 leaderActorContext.setCommitIndex(-1);
1278 leaderActorContext.setLastApplied(-1);
1280 followerActorContext.getReplicatedLog().removeFrom(0);
1281 followerActorContext.setCommitIndex(-1);
1282 followerActorContext.setLastApplied(-1);
1284 leader = new Leader(leaderActorContext);
1286 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1287 leaderActor, AppendEntriesReply.class);
1289 leader.handleMessage(followerActor, appendEntriesReply);
1291 // Clear initial heartbeat messages
1293 leaderActor.underlyingActor().clear();
1294 followerActor.underlyingActor().clear();
1297 leaderActorContext.setReplicatedLog(
1298 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1299 leaderActorContext.setCommitIndex(1);
1300 leaderActorContext.setLastApplied(1);
1302 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1303 TimeUnit.MILLISECONDS);
1305 leader.handleMessage(leaderActor, new SendHeartBeat());
1307 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1309 // Should send first log entry
1310 assertEquals(1, appendEntries.getLeaderCommit());
1311 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1312 assertEquals(-1, appendEntries.getPrevLogIndex());
1314 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1316 assertEquals(1, appendEntriesReply.getLogLastTerm());
1317 assertEquals(0, appendEntriesReply.getLogLastIndex());
1319 followerActor.underlyingActor().clear();
1321 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1323 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325 // Should send second log entry
1326 assertEquals(1, appendEntries.getLeaderCommit());
1327 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1333 public void testLaggingFollowerStarvation() throws Exception {
1334 logStart("testLaggingFollowerStarvation");
1335 new JavaTestKit(getSystem()) {{
1336 String leaderActorId = actorFactory.generateActorId("leader");
1337 String follower1ActorId = actorFactory.generateActorId("follower");
1338 String follower2ActorId = actorFactory.generateActorId("follower");
1340 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1341 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1342 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1343 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1345 MockRaftActorContext leaderActorContext =
1346 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1348 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1349 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1350 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1352 leaderActorContext.setConfigParams(configParams);
1354 leaderActorContext.setReplicatedLog(
1355 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1357 Map<String, String> peerAddresses = new HashMap<>();
1358 peerAddresses.put(follower1ActorId,
1359 follower1Actor.path().toString());
1360 peerAddresses.put(follower2ActorId,
1361 follower2Actor.path().toString());
1363 leaderActorContext.setPeerAddresses(peerAddresses);
1364 leaderActorContext.getTermInformation().update(1, leaderActorId);
1366 RaftActorBehavior leader = createBehavior(leaderActorContext);
1368 leaderActor.underlyingActor().setBehavior(leader);
1370 for(int i=1;i<6;i++) {
1371 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1372 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1373 assertTrue(newBehavior == leader);
1374 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1377 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1378 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1380 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1381 heartbeats.size() > 1);
1383 // Check if follower-2 got AppendEntries during this time and was not starved
1384 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1386 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1387 appendEntries.size() > 1);
1393 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1394 ActorRef actorRef, RaftRPC rpc) throws Exception {
1395 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1396 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1399 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1401 private final long electionTimeOutIntervalMillis;
1402 private final int snapshotChunkSize;
1404 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1406 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1407 this.snapshotChunkSize = snapshotChunkSize;
1411 public FiniteDuration getElectionTimeOutInterval() {
1412 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1416 public int getSnapshotChunkSize() {
1417 return snapshotChunkSize;