1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
48 public class LeaderTest extends AbstractLeaderTest {
50 static final String FOLLOWER_ID = "follower";
51 public static final String LEADER_ID = "leader";
53 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59 private Leader leader;
63 public void tearDown() throws Exception {
72 public void testHandleMessageForUnknownMessage() throws Exception {
73 logStart("testHandleMessageForUnknownMessage");
75 leader = new Leader(createActorContext());
77 // handle message should return the Leader state when it receives an
79 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80 Assert.assertTrue(behavior instanceof Leader);
84 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
87 MockRaftActorContext actorContext = createActorContextWithFollower();
88 short payloadVersion = (short)5;
89 actorContext.setPayloadVersion(payloadVersion);
92 actorContext.getTermInformation().update(term, "");
94 leader = new Leader(actorContext);
96 // Leader should send an immediate heartbeat with no entries as follower is inactive.
97 long lastIndex = actorContext.getReplicatedLog().lastIndex();
98 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
99 assertEquals("getTerm", term, appendEntries.getTerm());
100 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
101 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
102 assertEquals("Entries size", 0, appendEntries.getEntries().size());
103 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
105 // The follower would normally reply - simulate that explicitly here.
106 leader.handleMessage(followerActor, new AppendEntriesReply(
107 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
108 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
110 followerActor.underlyingActor().clear();
112 // Sleep for the heartbeat interval so AppendEntries is sent.
113 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
114 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
116 leader.handleMessage(leaderActor, new SendHeartBeat());
118 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
120 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
121 assertEquals("Entries size", 1, appendEntries.getEntries().size());
122 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
123 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
124 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
128 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
129 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
130 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
132 actorContext.getReplicatedLog().append(newEntry);
133 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
137 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
138 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
140 MockRaftActorContext actorContext = createActorContextWithFollower();
143 actorContext.getTermInformation().update(term, "");
145 leader = new Leader(actorContext);
147 // Leader will send an immediate heartbeat - ignore it.
148 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
150 // The follower would normally reply - simulate that explicitly here.
151 long lastIndex = actorContext.getReplicatedLog().lastIndex();
152 leader.handleMessage(followerActor, new AppendEntriesReply(
153 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
154 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
156 followerActor.underlyingActor().clear();
158 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
160 1, lastIndex + 1, payload);
161 actorContext.getReplicatedLog().append(newEntry);
162 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
164 // State should not change
165 assertTrue(raftBehavior instanceof Leader);
167 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
168 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
169 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
170 assertEquals("Entries size", 1, appendEntries.getEntries().size());
171 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
172 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
173 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
177 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
178 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
180 MockRaftActorContext actorContext = createActorContextWithFollower();
181 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
183 public FiniteDuration getHeartBeatInterval() {
184 return FiniteDuration.apply(5, TimeUnit.SECONDS);
189 actorContext.getTermInformation().update(term, "");
191 leader = new Leader(actorContext);
193 // Leader will send an immediate heartbeat - ignore it.
194 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
196 // The follower would normally reply - simulate that explicitly here.
197 long lastIndex = actorContext.getReplicatedLog().lastIndex();
198 leader.handleMessage(followerActor, new AppendEntriesReply(
199 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
200 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
202 followerActor.underlyingActor().clear();
204 for(int i=0;i<5;i++) {
205 sendReplicate(actorContext, lastIndex+i+1);
208 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
209 // We expect only 1 message to be sent because of two reasons,
210 // - an append entries reply was not received
211 // - the heartbeat interval has not expired
212 // In this scenario if multiple messages are sent they would likely be duplicates
213 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
217 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
218 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
220 MockRaftActorContext actorContext = createActorContextWithFollower();
221 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
223 public FiniteDuration getHeartBeatInterval() {
224 return FiniteDuration.apply(5, TimeUnit.SECONDS);
229 actorContext.getTermInformation().update(term, "");
231 leader = new Leader(actorContext);
233 // Leader will send an immediate heartbeat - ignore it.
234 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236 // The follower would normally reply - simulate that explicitly here.
237 long lastIndex = actorContext.getReplicatedLog().lastIndex();
238 leader.handleMessage(followerActor, new AppendEntriesReply(
239 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
240 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
242 followerActor.underlyingActor().clear();
244 for(int i=0;i<3;i++) {
245 sendReplicate(actorContext, lastIndex+i+1);
246 leader.handleMessage(followerActor, new AppendEntriesReply(
247 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
251 for(int i=3;i<5;i++) {
252 sendReplicate(actorContext, lastIndex + i + 1);
255 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
256 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
257 // get sent to the follower - but not the 5th
258 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
260 for(int i=0;i<4;i++) {
261 long expected = allMessages.get(i).getEntries().get(0).getIndex();
262 assertEquals(expected, i+2);
267 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
268 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
270 MockRaftActorContext actorContext = createActorContextWithFollower();
271 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
273 public FiniteDuration getHeartBeatInterval() {
274 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
279 actorContext.getTermInformation().update(term, "");
281 leader = new Leader(actorContext);
283 // Leader will send an immediate heartbeat - ignore it.
284 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
286 // The follower would normally reply - simulate that explicitly here.
287 long lastIndex = actorContext.getReplicatedLog().lastIndex();
288 leader.handleMessage(followerActor, new AppendEntriesReply(
289 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
290 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
292 followerActor.underlyingActor().clear();
294 sendReplicate(actorContext, lastIndex+1);
296 // Wait slightly longer than heartbeat duration
297 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
299 leader.handleMessage(leaderActor, new SendHeartBeat());
301 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
302 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
304 assertEquals(1, allMessages.get(0).getEntries().size());
305 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
306 assertEquals(1, allMessages.get(1).getEntries().size());
307 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
312 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
313 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
315 MockRaftActorContext actorContext = createActorContextWithFollower();
316 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
318 public FiniteDuration getHeartBeatInterval() {
319 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
324 actorContext.getTermInformation().update(term, "");
326 leader = new Leader(actorContext);
328 // Leader will send an immediate heartbeat - ignore it.
329 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
331 // The follower would normally reply - simulate that explicitly here.
332 long lastIndex = actorContext.getReplicatedLog().lastIndex();
333 leader.handleMessage(followerActor, new AppendEntriesReply(
334 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
335 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
337 followerActor.underlyingActor().clear();
339 for(int i=0;i<3;i++) {
340 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
341 leader.handleMessage(leaderActor, new SendHeartBeat());
344 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
345 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
349 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
350 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
352 MockRaftActorContext actorContext = createActorContextWithFollower();
353 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355 public FiniteDuration getHeartBeatInterval() {
356 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
361 actorContext.getTermInformation().update(term, "");
363 leader = new Leader(actorContext);
365 // Leader will send an immediate heartbeat - ignore it.
366 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368 // The follower would normally reply - simulate that explicitly here.
369 long lastIndex = actorContext.getReplicatedLog().lastIndex();
370 leader.handleMessage(followerActor, new AppendEntriesReply(
371 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374 followerActor.underlyingActor().clear();
376 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
377 leader.handleMessage(leaderActor, new SendHeartBeat());
378 sendReplicate(actorContext, lastIndex+1);
380 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
383 assertEquals(0, allMessages.get(0).getEntries().size());
384 assertEquals(1, allMessages.get(1).getEntries().size());
389 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
390 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
392 MockRaftActorContext actorContext = createActorContext();
394 leader = new Leader(actorContext);
396 actorContext.setLastApplied(0);
398 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
399 long term = actorContext.getTermInformation().getCurrentTerm();
400 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
401 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
403 actorContext.getReplicatedLog().append(newEntry);
405 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
406 new Replicate(leaderActor, "state-id", newEntry));
408 // State should not change
409 assertTrue(raftBehavior instanceof Leader);
411 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
413 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
414 // one since lastApplied state is 0.
415 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
416 leaderActor, ApplyState.class);
417 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
419 for(int i = 0; i <= newLogIndex - 1; i++ ) {
420 ApplyState applyState = applyStateList.get(i);
421 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
422 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
425 ApplyState last = applyStateList.get((int) newLogIndex - 1);
426 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
427 assertEquals("getIdentifier", "state-id", last.getIdentifier());
431 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
432 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
434 MockRaftActorContext actorContext = createActorContextWithFollower();
436 Map<String, String> leadersSnapshot = new HashMap<>();
437 leadersSnapshot.put("1", "A");
438 leadersSnapshot.put("2", "B");
439 leadersSnapshot.put("3", "C");
442 actorContext.getReplicatedLog().removeFrom(0);
444 final int followersLastIndex = 2;
445 final int snapshotIndex = 3;
446 final int newEntryIndex = 4;
447 final int snapshotTerm = 1;
448 final int currentTerm = 2;
450 // set the snapshot variables in replicatedlog
451 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
452 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
453 actorContext.setCommitIndex(followersLastIndex);
454 //set follower timeout to 2 mins, helps during debugging
455 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
457 leader = new Leader(actorContext);
460 ReplicatedLogImplEntry entry =
461 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
462 new MockRaftActorContext.MockPayload("D"));
464 //update follower timestamp
465 leader.markFollowerActive(FOLLOWER_ID);
467 ByteString bs = toByteString(leadersSnapshot);
468 leader.setSnapshot(Optional.of(bs));
469 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
470 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
472 //send first chunk and no InstallSnapshotReply received yet
474 fts.incrementChunkIndex();
476 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
477 TimeUnit.MILLISECONDS);
479 leader.handleMessage(leaderActor, new SendHeartBeat());
481 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
485 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
487 //InstallSnapshotReply received
488 fts.markSendStatus(true);
490 leader.handleMessage(leaderActor, new SendHeartBeat());
492 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
494 assertEquals(snapshotIndex, is.getLastIncludedIndex());
498 public void testSendAppendEntriesSnapshotScenario() throws Exception {
499 logStart("testSendAppendEntriesSnapshotScenario");
501 MockRaftActorContext actorContext = createActorContextWithFollower();
503 Map<String, String> leadersSnapshot = new HashMap<>();
504 leadersSnapshot.put("1", "A");
505 leadersSnapshot.put("2", "B");
506 leadersSnapshot.put("3", "C");
509 actorContext.getReplicatedLog().removeFrom(0);
511 final int followersLastIndex = 2;
512 final int snapshotIndex = 3;
513 final int newEntryIndex = 4;
514 final int snapshotTerm = 1;
515 final int currentTerm = 2;
517 // set the snapshot variables in replicatedlog
518 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
519 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
520 actorContext.setCommitIndex(followersLastIndex);
522 leader = new Leader(actorContext);
524 // Leader will send an immediate heartbeat - ignore it.
525 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
528 ReplicatedLogImplEntry entry =
529 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
530 new MockRaftActorContext.MockPayload("D"));
532 actorContext.getReplicatedLog().append(entry);
534 //update follower timestamp
535 leader.markFollowerActive(FOLLOWER_ID);
537 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
538 RaftActorBehavior raftBehavior = leader.handleMessage(
539 leaderActor, new Replicate(null, "state-id", entry));
541 assertTrue(raftBehavior instanceof Leader);
543 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
547 public void testInitiateInstallSnapshot() throws Exception {
548 logStart("testInitiateInstallSnapshot");
550 MockRaftActorContext actorContext = createActorContextWithFollower();
552 Map<String, String> leadersSnapshot = new HashMap<>();
553 leadersSnapshot.put("1", "A");
554 leadersSnapshot.put("2", "B");
555 leadersSnapshot.put("3", "C");
558 actorContext.getReplicatedLog().removeFrom(0);
560 final int followersLastIndex = 2;
561 final int snapshotIndex = 3;
562 final int newEntryIndex = 4;
563 final int snapshotTerm = 1;
564 final int currentTerm = 2;
566 // set the snapshot variables in replicatedlog
567 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569 actorContext.setLastApplied(3);
570 actorContext.setCommitIndex(followersLastIndex);
572 leader = new Leader(actorContext);
574 // Leader will send an immediate heartbeat - ignore it.
575 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
577 // set the snapshot as absent and check if capture-snapshot is invoked.
578 leader.setSnapshot(Optional.<ByteString>absent());
581 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582 new MockRaftActorContext.MockPayload("D"));
584 actorContext.getReplicatedLog().append(entry);
586 //update follower timestamp
587 leader.markFollowerActive(FOLLOWER_ID);
589 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
591 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
593 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
595 assertTrue(cs.isInstallSnapshotInitiated());
596 assertEquals(3, cs.getLastAppliedIndex());
597 assertEquals(1, cs.getLastAppliedTerm());
598 assertEquals(4, cs.getLastIndex());
599 assertEquals(2, cs.getLastTerm());
601 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
602 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
604 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
608 public void testInstallSnapshot() throws Exception {
609 logStart("testInstallSnapshot");
611 MockRaftActorContext actorContext = createActorContextWithFollower();
613 Map<String, String> leadersSnapshot = new HashMap<>();
614 leadersSnapshot.put("1", "A");
615 leadersSnapshot.put("2", "B");
616 leadersSnapshot.put("3", "C");
619 actorContext.getReplicatedLog().removeFrom(0);
621 final int followersLastIndex = 2;
622 final int snapshotIndex = 3;
623 final int snapshotTerm = 1;
624 final int currentTerm = 2;
626 // set the snapshot variables in replicatedlog
627 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
628 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
629 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
630 actorContext.setCommitIndex(followersLastIndex);
632 leader = new Leader(actorContext);
634 // Ignore initial heartbeat.
635 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
637 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
638 new SendInstallSnapshot(toByteString(leadersSnapshot)));
640 assertTrue(raftBehavior instanceof Leader);
642 // check if installsnapshot gets called with the correct values.
644 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
646 assertNotNull(installSnapshot.getData());
647 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
648 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
650 assertEquals(currentTerm, installSnapshot.getTerm());
654 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
655 logStart("testHandleInstallSnapshotReplyLastChunk");
657 MockRaftActorContext actorContext = createActorContextWithFollower();
659 final int followersLastIndex = 2;
660 final int snapshotIndex = 3;
661 final int snapshotTerm = 1;
662 final int currentTerm = 2;
664 actorContext.setCommitIndex(followersLastIndex);
666 leader = new Leader(actorContext);
668 // Ignore initial heartbeat.
669 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
671 Map<String, String> leadersSnapshot = new HashMap<>();
672 leadersSnapshot.put("1", "A");
673 leadersSnapshot.put("2", "B");
674 leadersSnapshot.put("3", "C");
676 // set the snapshot variables in replicatedlog
678 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
682 ByteString bs = toByteString(leadersSnapshot);
683 leader.setSnapshot(Optional.of(bs));
684 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
685 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
686 while(!fts.isLastChunk(fts.getChunkIndex())) {
688 fts.incrementChunkIndex();
692 actorContext.getReplicatedLog().removeFrom(0);
694 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
695 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
697 assertTrue(raftBehavior instanceof Leader);
699 assertEquals(0, leader.followerSnapshotSize());
700 assertEquals(1, leader.followerLogSize());
701 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
703 assertEquals(snapshotIndex, fli.getMatchIndex());
704 assertEquals(snapshotIndex, fli.getMatchIndex());
705 assertEquals(snapshotIndex + 1, fli.getNextIndex());
709 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
710 logStart("testSendSnapshotfromInstallSnapshotReply");
712 MockRaftActorContext actorContext = createActorContextWithFollower();
714 final int followersLastIndex = 2;
715 final int snapshotIndex = 3;
716 final int snapshotTerm = 1;
717 final int currentTerm = 2;
719 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
721 public int getSnapshotChunkSize() {
725 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
726 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
728 actorContext.setConfigParams(configParams);
729 actorContext.setCommitIndex(followersLastIndex);
731 leader = new Leader(actorContext);
733 Map<String, String> leadersSnapshot = new HashMap<>();
734 leadersSnapshot.put("1", "A");
735 leadersSnapshot.put("2", "B");
736 leadersSnapshot.put("3", "C");
738 // set the snapshot variables in replicatedlog
739 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
740 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
741 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
743 ByteString bs = toByteString(leadersSnapshot);
744 leader.setSnapshot(Optional.of(bs));
746 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
748 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
750 assertEquals(1, installSnapshot.getChunkIndex());
751 assertEquals(3, installSnapshot.getTotalChunks());
753 followerActor.underlyingActor().clear();
754 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
755 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
757 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
759 assertEquals(2, installSnapshot.getChunkIndex());
760 assertEquals(3, installSnapshot.getTotalChunks());
762 followerActor.underlyingActor().clear();
763 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
764 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
766 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
768 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
769 followerActor.underlyingActor().clear();
770 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
771 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
773 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
775 Assert.assertNull(installSnapshot);
780 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
781 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
783 MockRaftActorContext actorContext = createActorContextWithFollower();
785 final int followersLastIndex = 2;
786 final int snapshotIndex = 3;
787 final int snapshotTerm = 1;
788 final int currentTerm = 2;
790 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
792 public int getSnapshotChunkSize() {
797 actorContext.setCommitIndex(followersLastIndex);
799 leader = new Leader(actorContext);
801 Map<String, String> leadersSnapshot = new HashMap<>();
802 leadersSnapshot.put("1", "A");
803 leadersSnapshot.put("2", "B");
804 leadersSnapshot.put("3", "C");
806 // set the snapshot variables in replicatedlog
807 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
808 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
809 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
811 ByteString bs = toByteString(leadersSnapshot);
812 leader.setSnapshot(Optional.of(bs));
814 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
815 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
817 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
819 assertEquals(1, installSnapshot.getChunkIndex());
820 assertEquals(3, installSnapshot.getTotalChunks());
822 followerActor.underlyingActor().clear();
824 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
825 FOLLOWER_ID, -1, false));
827 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
828 TimeUnit.MILLISECONDS);
830 leader.handleMessage(leaderActor, new SendHeartBeat());
832 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
834 assertEquals(1, installSnapshot.getChunkIndex());
835 assertEquals(3, installSnapshot.getTotalChunks());
839 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
840 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
842 MockRaftActorContext actorContext = createActorContextWithFollower();
844 final int followersLastIndex = 2;
845 final int snapshotIndex = 3;
846 final int snapshotTerm = 1;
847 final int currentTerm = 2;
849 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
851 public int getSnapshotChunkSize() {
856 actorContext.setCommitIndex(followersLastIndex);
858 leader = new Leader(actorContext);
860 Map<String, String> leadersSnapshot = new HashMap<>();
861 leadersSnapshot.put("1", "A");
862 leadersSnapshot.put("2", "B");
863 leadersSnapshot.put("3", "C");
865 // set the snapshot variables in replicatedlog
866 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
867 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
868 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
870 ByteString bs = toByteString(leadersSnapshot);
871 leader.setSnapshot(Optional.of(bs));
873 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
875 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877 assertEquals(1, installSnapshot.getChunkIndex());
878 assertEquals(3, installSnapshot.getTotalChunks());
879 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
881 int hashCode = installSnapshot.getData().hashCode();
883 followerActor.underlyingActor().clear();
885 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
886 FOLLOWER_ID, 1, true));
888 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
890 assertEquals(2, installSnapshot.getChunkIndex());
891 assertEquals(3, installSnapshot.getTotalChunks());
892 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
896 public void testFollowerToSnapshotLogic() {
897 logStart("testFollowerToSnapshotLogic");
899 MockRaftActorContext actorContext = createActorContext();
901 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
903 public int getSnapshotChunkSize() {
908 leader = new Leader(actorContext);
910 Map<String, String> leadersSnapshot = new HashMap<>();
911 leadersSnapshot.put("1", "A");
912 leadersSnapshot.put("2", "B");
913 leadersSnapshot.put("3", "C");
915 ByteString bs = toByteString(leadersSnapshot);
916 byte[] barray = bs.toByteArray();
918 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
919 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
921 assertEquals(bs.size(), barray.length);
924 for (int i=0; i < barray.length; i = i + 50) {
928 if (i + 50 > barray.length) {
932 ByteString chunk = fts.getNextChunk();
933 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
934 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
936 fts.markSendStatus(true);
937 if (!fts.isLastChunk(chunkIndex)) {
938 fts.incrementChunkIndex();
942 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
945 @Override protected RaftActorBehavior createBehavior(
946 RaftActorContext actorContext) {
947 return new Leader(actorContext);
951 protected MockRaftActorContext createActorContext() {
952 return createActorContext(leaderActor);
956 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
957 return createActorContext(LEADER_ID, actorRef);
960 private MockRaftActorContext createActorContextWithFollower() {
961 MockRaftActorContext actorContext = createActorContext();
962 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
963 followerActor.path().toString()).build());
967 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
968 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
969 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
970 configParams.setElectionTimeoutFactor(100000);
971 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
972 context.setConfigParams(configParams);
977 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
978 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
980 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
982 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
984 Follower follower = new Follower(followerActorContext);
985 followerActor.underlyingActor().setBehavior(follower);
987 Map<String, String> peerAddresses = new HashMap<>();
988 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
990 leaderActorContext.setPeerAddresses(peerAddresses);
992 leaderActorContext.getReplicatedLog().removeFrom(0);
995 leaderActorContext.setReplicatedLog(
996 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
998 leaderActorContext.setCommitIndex(1);
1000 followerActorContext.getReplicatedLog().removeFrom(0);
1002 // follower too has the exact same log entries and has the same commit index
1003 followerActorContext.setReplicatedLog(
1004 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1006 followerActorContext.setCommitIndex(1);
1008 leader = new Leader(leaderActorContext);
1010 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1012 assertEquals(1, appendEntries.getLeaderCommit());
1013 assertEquals(0, appendEntries.getEntries().size());
1014 assertEquals(0, appendEntries.getPrevLogIndex());
1016 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1017 leaderActor, AppendEntriesReply.class);
1019 assertEquals(2, appendEntriesReply.getLogLastIndex());
1020 assertEquals(1, appendEntriesReply.getLogLastTerm());
1022 // follower returns its next index
1023 assertEquals(2, appendEntriesReply.getLogLastIndex());
1024 assertEquals(1, appendEntriesReply.getLogLastTerm());
1030 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1031 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1033 MockRaftActorContext leaderActorContext = createActorContext();
1035 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1036 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1038 Follower follower = new Follower(followerActorContext);
1039 followerActor.underlyingActor().setBehavior(follower);
1041 Map<String, String> leaderPeerAddresses = new HashMap<>();
1042 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1044 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1046 leaderActorContext.getReplicatedLog().removeFrom(0);
1048 leaderActorContext.setReplicatedLog(
1049 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1051 leaderActorContext.setCommitIndex(1);
1053 followerActorContext.getReplicatedLog().removeFrom(0);
1055 followerActorContext.setReplicatedLog(
1056 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1058 // follower has the same log entries but its commit index > leaders commit index
1059 followerActorContext.setCommitIndex(2);
1061 leader = new Leader(leaderActorContext);
1063 // Initial heartbeat
1064 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1066 assertEquals(1, appendEntries.getLeaderCommit());
1067 assertEquals(0, appendEntries.getEntries().size());
1068 assertEquals(0, appendEntries.getPrevLogIndex());
1070 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1071 leaderActor, AppendEntriesReply.class);
1073 assertEquals(2, appendEntriesReply.getLogLastIndex());
1074 assertEquals(1, appendEntriesReply.getLogLastTerm());
1076 leaderActor.underlyingActor().setBehavior(follower);
1077 leader.handleMessage(followerActor, appendEntriesReply);
1079 leaderActor.underlyingActor().clear();
1080 followerActor.underlyingActor().clear();
1082 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1083 TimeUnit.MILLISECONDS);
1085 leader.handleMessage(leaderActor, new SendHeartBeat());
1087 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1089 assertEquals(2, appendEntries.getLeaderCommit());
1090 assertEquals(0, appendEntries.getEntries().size());
1091 assertEquals(2, appendEntries.getPrevLogIndex());
1093 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1095 assertEquals(2, appendEntriesReply.getLogLastIndex());
1096 assertEquals(1, appendEntriesReply.getLogLastTerm());
1098 assertEquals(2, followerActorContext.getCommitIndex());
1104 public void testHandleAppendEntriesReplyFailure(){
1105 logStart("testHandleAppendEntriesReplyFailure");
1107 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1109 leader = new Leader(leaderActorContext);
1111 // Send initial heartbeat reply with last index.
1112 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
1114 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1115 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1117 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
1119 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1121 assertEquals(RaftState.Leader, raftActorBehavior.state());
1123 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1127 public void testHandleAppendEntriesReplySuccess() throws Exception {
1128 logStart("testHandleAppendEntriesReplySuccess");
1130 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1132 leaderActorContext.setReplicatedLog(
1133 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1135 leaderActorContext.setCommitIndex(1);
1136 leaderActorContext.setLastApplied(1);
1137 leaderActorContext.getTermInformation().update(1, "leader");
1139 leader = new Leader(leaderActorContext);
1141 short payloadVersion = 5;
1142 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1144 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1146 assertEquals(RaftState.Leader, raftActorBehavior.state());
1148 assertEquals(2, leaderActorContext.getCommitIndex());
1150 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1151 leaderActor, ApplyJournalEntries.class);
1153 assertEquals(2, leaderActorContext.getLastApplied());
1155 assertEquals(2, applyJournalEntries.getToIndex());
1157 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1160 assertEquals(1,applyStateList.size());
1162 ApplyState applyState = applyStateList.get(0);
1164 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1166 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1167 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1171 public void testHandleAppendEntriesReplyUnknownFollower(){
1172 logStart("testHandleAppendEntriesReplyUnknownFollower");
1174 MockRaftActorContext leaderActorContext = createActorContext();
1176 leader = new Leader(leaderActorContext);
1178 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1180 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1182 assertEquals(RaftState.Leader, raftActorBehavior.state());
1186 public void testHandleRequestVoteReply(){
1187 logStart("testHandleRequestVoteReply");
1189 MockRaftActorContext leaderActorContext = createActorContext();
1191 leader = new Leader(leaderActorContext);
1193 // Should be a no-op.
1194 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1195 new RequestVoteReply(1, true));
1197 assertEquals(RaftState.Leader, raftActorBehavior.state());
1199 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1201 assertEquals(RaftState.Leader, raftActorBehavior.state());
1205 public void testIsolatedLeaderCheckNoFollowers() {
1206 logStart("testIsolatedLeaderCheckNoFollowers");
1208 MockRaftActorContext leaderActorContext = createActorContext();
1210 leader = new Leader(leaderActorContext);
1211 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1212 Assert.assertTrue(behavior instanceof Leader);
1216 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1217 logStart("testIsolatedLeaderCheckTwoFollowers");
1219 new JavaTestKit(getSystem()) {{
1221 ActorRef followerActor1 = getTestActor();
1222 ActorRef followerActor2 = getTestActor();
1224 MockRaftActorContext leaderActorContext = createActorContext();
1226 Map<String, String> peerAddresses = new HashMap<>();
1227 peerAddresses.put("follower-1", followerActor1.path().toString());
1228 peerAddresses.put("follower-2", followerActor2.path().toString());
1230 leaderActorContext.setPeerAddresses(peerAddresses);
1232 leader = new Leader(leaderActorContext);
1234 leader.markFollowerActive("follower-1");
1235 leader.markFollowerActive("follower-2");
1236 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1237 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1238 behavior instanceof Leader);
1240 // kill 1 follower and verify if that got killed
1241 final JavaTestKit probe = new JavaTestKit(getSystem());
1242 probe.watch(followerActor1);
1243 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1244 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1245 assertEquals(termMsg1.getActor(), followerActor1);
1247 leader.markFollowerInActive("follower-1");
1248 leader.markFollowerActive("follower-2");
1249 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1250 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1251 behavior instanceof Leader);
1253 // kill 2nd follower and leader should change to Isolated leader
1254 followerActor2.tell(PoisonPill.getInstance(), null);
1255 probe.watch(followerActor2);
1256 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1257 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1258 assertEquals(termMsg2.getActor(), followerActor2);
1260 leader.markFollowerInActive("follower-2");
1261 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1262 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1263 behavior instanceof IsolatedLeader);
1269 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1270 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1272 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1274 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1276 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1278 leaderActorContext.setConfigParams(configParams);
1280 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1282 followerActorContext.setConfigParams(configParams);
1283 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1285 Follower follower = new Follower(followerActorContext);
1286 followerActor.underlyingActor().setBehavior(follower);
1288 leaderActorContext.getReplicatedLog().removeFrom(0);
1289 leaderActorContext.setCommitIndex(-1);
1290 leaderActorContext.setLastApplied(-1);
1292 followerActorContext.getReplicatedLog().removeFrom(0);
1293 followerActorContext.setCommitIndex(-1);
1294 followerActorContext.setLastApplied(-1);
1296 leader = new Leader(leaderActorContext);
1298 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1299 leaderActor, AppendEntriesReply.class);
1301 leader.handleMessage(followerActor, appendEntriesReply);
1303 // Clear initial heartbeat messages
1305 leaderActor.underlyingActor().clear();
1306 followerActor.underlyingActor().clear();
1309 leaderActorContext.setReplicatedLog(
1310 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1311 leaderActorContext.setCommitIndex(1);
1312 leaderActorContext.setLastApplied(1);
1314 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1315 TimeUnit.MILLISECONDS);
1317 leader.handleMessage(leaderActor, new SendHeartBeat());
1319 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1321 // Should send first log entry
1322 assertEquals(1, appendEntries.getLeaderCommit());
1323 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1324 assertEquals(-1, appendEntries.getPrevLogIndex());
1326 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1328 assertEquals(1, appendEntriesReply.getLogLastTerm());
1329 assertEquals(0, appendEntriesReply.getLogLastIndex());
1331 followerActor.underlyingActor().clear();
1333 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1335 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1337 // Should send second log entry
1338 assertEquals(1, appendEntries.getLeaderCommit());
1339 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1345 public void testLaggingFollowerStarvation() throws Exception {
1346 logStart("testLaggingFollowerStarvation");
1347 new JavaTestKit(getSystem()) {{
1348 String leaderActorId = actorFactory.generateActorId("leader");
1349 String follower1ActorId = actorFactory.generateActorId("follower");
1350 String follower2ActorId = actorFactory.generateActorId("follower");
1352 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1353 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1354 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1355 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1357 MockRaftActorContext leaderActorContext =
1358 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1360 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1361 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1362 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1364 leaderActorContext.setConfigParams(configParams);
1366 leaderActorContext.setReplicatedLog(
1367 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1369 Map<String, String> peerAddresses = new HashMap<>();
1370 peerAddresses.put(follower1ActorId,
1371 follower1Actor.path().toString());
1372 peerAddresses.put(follower2ActorId,
1373 follower2Actor.path().toString());
1375 leaderActorContext.setPeerAddresses(peerAddresses);
1376 leaderActorContext.getTermInformation().update(1, leaderActorId);
1378 RaftActorBehavior leader = createBehavior(leaderActorContext);
1380 leaderActor.underlyingActor().setBehavior(leader);
1382 for(int i=1;i<6;i++) {
1383 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1384 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1385 assertTrue(newBehavior == leader);
1386 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1389 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1390 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1392 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1393 heartbeats.size() > 1);
1395 // Check if follower-2 got AppendEntries during this time and was not starved
1396 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1398 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1399 appendEntries.size() > 1);
1405 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1406 ActorRef actorRef, RaftRPC rpc) throws Exception {
1407 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1408 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1411 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1413 private final long electionTimeOutIntervalMillis;
1414 private final int snapshotChunkSize;
1416 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1418 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1419 this.snapshotChunkSize = snapshotChunkSize;
1423 public FiniteDuration getElectionTimeOutInterval() {
1424 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1428 public int getSnapshotChunkSize() {
1429 return snapshotChunkSize;