1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractLeaderTest {
51 static final String FOLLOWER_ID = "follower";
53 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59 private Leader leader;
63 public void tearDown() throws Exception {
72 public void testHandleMessageForUnknownMessage() throws Exception {
73 logStart("testHandleMessageForUnknownMessage");
75 leader = new Leader(createActorContext());
77 // handle message should return the Leader state when it receives an
79 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80 Assert.assertTrue(behavior instanceof Leader);
84 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
87 MockRaftActorContext actorContext = createActorContextWithFollower();
90 actorContext.getTermInformation().update(term, "");
92 leader = new Leader(actorContext);
94 // Leader should send an immediate heartbeat with no entries as follower is inactive.
95 long lastIndex = actorContext.getReplicatedLog().lastIndex();
96 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97 assertEquals("getTerm", term, appendEntries.getTerm());
98 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100 assertEquals("Entries size", 0, appendEntries.getEntries().size());
102 // The follower would normally reply - simulate that explicitly here.
103 leader.handleMessage(followerActor, new AppendEntriesReply(
104 FOLLOWER_ID, term, true, lastIndex - 1, term));
105 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
107 followerActor.underlyingActor().clear();
109 // Sleep for the heartbeat interval so AppendEntries is sent.
110 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
113 leader.handleMessage(leaderActor, new SendHeartBeat());
115 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118 assertEquals("Entries size", 1, appendEntries.getEntries().size());
119 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
124 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
125 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
126 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
128 actorContext.getReplicatedLog().append(newEntry);
129 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
133 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
134 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
136 MockRaftActorContext actorContext = createActorContextWithFollower();
139 actorContext.getTermInformation().update(term, "");
141 leader = new Leader(actorContext);
143 // Leader will send an immediate heartbeat - ignore it.
144 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
146 // The follower would normally reply - simulate that explicitly here.
147 long lastIndex = actorContext.getReplicatedLog().lastIndex();
148 leader.handleMessage(followerActor, new AppendEntriesReply(
149 FOLLOWER_ID, term, true, lastIndex, term));
150 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
152 followerActor.underlyingActor().clear();
154 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156 1, lastIndex + 1, payload);
157 actorContext.getReplicatedLog().append(newEntry);
158 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
160 // State should not change
161 assertTrue(raftBehavior instanceof Leader);
163 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166 assertEquals("Entries size", 1, appendEntries.getEntries().size());
167 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
173 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
174 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
176 MockRaftActorContext actorContext = createActorContextWithFollower();
177 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
179 public FiniteDuration getHeartBeatInterval() {
180 return FiniteDuration.apply(5, TimeUnit.SECONDS);
185 actorContext.getTermInformation().update(term, "");
187 leader = new Leader(actorContext);
189 // Leader will send an immediate heartbeat - ignore it.
190 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192 // The follower would normally reply - simulate that explicitly here.
193 long lastIndex = actorContext.getReplicatedLog().lastIndex();
194 leader.handleMessage(followerActor, new AppendEntriesReply(
195 FOLLOWER_ID, term, true, lastIndex, term));
196 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
198 followerActor.underlyingActor().clear();
200 for(int i=0;i<5;i++) {
201 sendReplicate(actorContext, lastIndex+i+1);
204 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
205 // We expect only 1 message to be sent because of two reasons,
206 // - an append entries reply was not received
207 // - the heartbeat interval has not expired
208 // In this scenario if multiple messages are sent they would likely be duplicates
209 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
213 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
214 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
216 MockRaftActorContext actorContext = createActorContextWithFollower();
217 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
219 public FiniteDuration getHeartBeatInterval() {
220 return FiniteDuration.apply(5, TimeUnit.SECONDS);
225 actorContext.getTermInformation().update(term, "");
227 leader = new Leader(actorContext);
229 // Leader will send an immediate heartbeat - ignore it.
230 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232 // The follower would normally reply - simulate that explicitly here.
233 long lastIndex = actorContext.getReplicatedLog().lastIndex();
234 leader.handleMessage(followerActor, new AppendEntriesReply(
235 FOLLOWER_ID, term, true, lastIndex, term));
236 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
238 followerActor.underlyingActor().clear();
240 for(int i=0;i<3;i++) {
241 sendReplicate(actorContext, lastIndex+i+1);
242 leader.handleMessage(followerActor, new AppendEntriesReply(
243 FOLLOWER_ID, term, true, lastIndex + i + 1, term));
247 for(int i=3;i<5;i++) {
248 sendReplicate(actorContext, lastIndex + i + 1);
251 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
252 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
253 // get sent to the follower - but not the 5th
254 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
256 for(int i=0;i<4;i++) {
257 long expected = allMessages.get(i).getEntries().get(0).getIndex();
258 assertEquals(expected, i+2);
263 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
264 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
266 MockRaftActorContext actorContext = createActorContextWithFollower();
267 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
269 public FiniteDuration getHeartBeatInterval() {
270 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
275 actorContext.getTermInformation().update(term, "");
277 leader = new Leader(actorContext);
279 // Leader will send an immediate heartbeat - ignore it.
280 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282 // The follower would normally reply - simulate that explicitly here.
283 long lastIndex = actorContext.getReplicatedLog().lastIndex();
284 leader.handleMessage(followerActor, new AppendEntriesReply(
285 FOLLOWER_ID, term, true, lastIndex, term));
286 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288 followerActor.underlyingActor().clear();
290 sendReplicate(actorContext, lastIndex+1);
292 // Wait slightly longer than heartbeat duration
293 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
295 leader.handleMessage(leaderActor, new SendHeartBeat());
297 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
298 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
300 assertEquals(1, allMessages.get(0).getEntries().size());
301 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
302 assertEquals(1, allMessages.get(1).getEntries().size());
303 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
308 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
309 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
311 MockRaftActorContext actorContext = createActorContextWithFollower();
312 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
314 public FiniteDuration getHeartBeatInterval() {
315 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
320 actorContext.getTermInformation().update(term, "");
322 leader = new Leader(actorContext);
324 // Leader will send an immediate heartbeat - ignore it.
325 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
327 // The follower would normally reply - simulate that explicitly here.
328 long lastIndex = actorContext.getReplicatedLog().lastIndex();
329 leader.handleMessage(followerActor, new AppendEntriesReply(
330 FOLLOWER_ID, term, true, lastIndex, term));
331 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
333 followerActor.underlyingActor().clear();
335 for(int i=0;i<3;i++) {
336 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
337 leader.handleMessage(leaderActor, new SendHeartBeat());
340 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
345 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
346 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
348 MockRaftActorContext actorContext = createActorContextWithFollower();
349 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
351 public FiniteDuration getHeartBeatInterval() {
352 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
357 actorContext.getTermInformation().update(term, "");
359 leader = new Leader(actorContext);
361 // Leader will send an immediate heartbeat - ignore it.
362 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
364 // The follower would normally reply - simulate that explicitly here.
365 long lastIndex = actorContext.getReplicatedLog().lastIndex();
366 leader.handleMessage(followerActor, new AppendEntriesReply(
367 FOLLOWER_ID, term, true, lastIndex, term));
368 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
370 followerActor.underlyingActor().clear();
372 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
373 leader.handleMessage(leaderActor, new SendHeartBeat());
374 sendReplicate(actorContext, lastIndex+1);
376 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
379 assertEquals(0, allMessages.get(0).getEntries().size());
380 assertEquals(1, allMessages.get(1).getEntries().size());
385 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
386 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
388 MockRaftActorContext actorContext = createActorContext();
390 leader = new Leader(actorContext);
392 actorContext.setLastApplied(0);
394 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
395 long term = actorContext.getTermInformation().getCurrentTerm();
396 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
397 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
399 actorContext.getReplicatedLog().append(newEntry);
401 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
402 new Replicate(leaderActor, "state-id", newEntry));
404 // State should not change
405 assertTrue(raftBehavior instanceof Leader);
407 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
409 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
410 // one since lastApplied state is 0.
411 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
412 leaderActor, ApplyState.class);
413 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
415 for(int i = 0; i <= newLogIndex - 1; i++ ) {
416 ApplyState applyState = applyStateList.get(i);
417 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
418 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
421 ApplyState last = applyStateList.get((int) newLogIndex - 1);
422 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
423 assertEquals("getIdentifier", "state-id", last.getIdentifier());
427 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
428 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
430 MockRaftActorContext actorContext = createActorContextWithFollower();
432 Map<String, String> leadersSnapshot = new HashMap<>();
433 leadersSnapshot.put("1", "A");
434 leadersSnapshot.put("2", "B");
435 leadersSnapshot.put("3", "C");
438 actorContext.getReplicatedLog().removeFrom(0);
440 final int followersLastIndex = 2;
441 final int snapshotIndex = 3;
442 final int newEntryIndex = 4;
443 final int snapshotTerm = 1;
444 final int currentTerm = 2;
446 // set the snapshot variables in replicatedlog
447 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
448 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
449 actorContext.setCommitIndex(followersLastIndex);
450 //set follower timeout to 2 mins, helps during debugging
451 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
453 leader = new Leader(actorContext);
456 ReplicatedLogImplEntry entry =
457 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
458 new MockRaftActorContext.MockPayload("D"));
460 //update follower timestamp
461 leader.markFollowerActive(FOLLOWER_ID);
463 ByteString bs = toByteString(leadersSnapshot);
464 leader.setSnapshot(Optional.of(bs));
465 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
466 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
468 //send first chunk and no InstallSnapshotReply received yet
470 fts.incrementChunkIndex();
472 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
473 TimeUnit.MILLISECONDS);
475 leader.handleMessage(leaderActor, new SendHeartBeat());
477 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
479 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
481 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
483 //InstallSnapshotReply received
484 fts.markSendStatus(true);
486 leader.handleMessage(leaderActor, new SendHeartBeat());
488 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
489 InstallSnapshot.SERIALIZABLE_CLASS);
491 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
493 assertEquals(snapshotIndex, is.getLastIncludedIndex());
497 public void testSendAppendEntriesSnapshotScenario() throws Exception {
498 logStart("testSendAppendEntriesSnapshotScenario");
500 MockRaftActorContext actorContext = createActorContextWithFollower();
502 Map<String, String> leadersSnapshot = new HashMap<>();
503 leadersSnapshot.put("1", "A");
504 leadersSnapshot.put("2", "B");
505 leadersSnapshot.put("3", "C");
508 actorContext.getReplicatedLog().removeFrom(0);
510 final int followersLastIndex = 2;
511 final int snapshotIndex = 3;
512 final int newEntryIndex = 4;
513 final int snapshotTerm = 1;
514 final int currentTerm = 2;
516 // set the snapshot variables in replicatedlog
517 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
518 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
519 actorContext.setCommitIndex(followersLastIndex);
521 leader = new Leader(actorContext);
523 // Leader will send an immediate heartbeat - ignore it.
524 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
527 ReplicatedLogImplEntry entry =
528 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
529 new MockRaftActorContext.MockPayload("D"));
531 //update follower timestamp
532 leader.markFollowerActive(FOLLOWER_ID);
534 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
535 RaftActorBehavior raftBehavior = leader.handleMessage(
536 leaderActor, new Replicate(null, "state-id", entry));
538 assertTrue(raftBehavior instanceof Leader);
540 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
544 public void testInitiateInstallSnapshot() throws Exception {
545 logStart("testInitiateInstallSnapshot");
547 MockRaftActorContext actorContext = createActorContextWithFollower();
549 Map<String, String> leadersSnapshot = new HashMap<>();
550 leadersSnapshot.put("1", "A");
551 leadersSnapshot.put("2", "B");
552 leadersSnapshot.put("3", "C");
555 actorContext.getReplicatedLog().removeFrom(0);
557 final int followersLastIndex = 2;
558 final int snapshotIndex = 3;
559 final int newEntryIndex = 4;
560 final int snapshotTerm = 1;
561 final int currentTerm = 2;
563 // set the snapshot variables in replicatedlog
564 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
565 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
566 actorContext.setLastApplied(3);
567 actorContext.setCommitIndex(followersLastIndex);
569 leader = new Leader(actorContext);
571 // Leader will send an immediate heartbeat - ignore it.
572 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
574 // set the snapshot as absent and check if capture-snapshot is invoked.
575 leader.setSnapshot(Optional.<ByteString>absent());
578 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579 new MockRaftActorContext.MockPayload("D"));
581 actorContext.getReplicatedLog().append(entry);
583 //update follower timestamp
584 leader.markFollowerActive(FOLLOWER_ID);
586 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
588 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
590 assertTrue(cs.isInstallSnapshotInitiated());
591 assertEquals(3, cs.getLastAppliedIndex());
592 assertEquals(1, cs.getLastAppliedTerm());
593 assertEquals(4, cs.getLastIndex());
594 assertEquals(2, cs.getLastTerm());
596 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
597 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
599 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
600 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
604 public void testInstallSnapshot() throws Exception {
605 logStart("testInstallSnapshot");
607 MockRaftActorContext actorContext = createActorContextWithFollower();
609 Map<String, String> leadersSnapshot = new HashMap<>();
610 leadersSnapshot.put("1", "A");
611 leadersSnapshot.put("2", "B");
612 leadersSnapshot.put("3", "C");
615 actorContext.getReplicatedLog().removeFrom(0);
617 final int followersLastIndex = 2;
618 final int snapshotIndex = 3;
619 final int snapshotTerm = 1;
620 final int currentTerm = 2;
622 // set the snapshot variables in replicatedlog
623 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
624 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
625 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
626 actorContext.setCommitIndex(followersLastIndex);
628 leader = new Leader(actorContext);
630 // Ignore initial heartbeat.
631 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
633 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
634 new SendInstallSnapshot(toByteString(leadersSnapshot)));
636 assertTrue(raftBehavior instanceof Leader);
638 // check if installsnapshot gets called with the correct values.
640 InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
641 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
643 assertNotNull(installSnapshot.getData());
644 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
645 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
647 assertEquals(currentTerm, installSnapshot.getTerm());
651 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
652 logStart("testHandleInstallSnapshotReplyLastChunk");
654 MockRaftActorContext actorContext = createActorContextWithFollower();
656 final int followersLastIndex = 2;
657 final int snapshotIndex = 3;
658 final int snapshotTerm = 1;
659 final int currentTerm = 2;
661 actorContext.setCommitIndex(followersLastIndex);
663 leader = new Leader(actorContext);
665 // Ignore initial heartbeat.
666 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
668 Map<String, String> leadersSnapshot = new HashMap<>();
669 leadersSnapshot.put("1", "A");
670 leadersSnapshot.put("2", "B");
671 leadersSnapshot.put("3", "C");
673 // set the snapshot variables in replicatedlog
675 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
679 ByteString bs = toByteString(leadersSnapshot);
680 leader.setSnapshot(Optional.of(bs));
681 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
682 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
683 while(!fts.isLastChunk(fts.getChunkIndex())) {
685 fts.incrementChunkIndex();
689 actorContext.getReplicatedLog().removeFrom(0);
691 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
692 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
694 assertTrue(raftBehavior instanceof Leader);
696 assertEquals(0, leader.followerSnapshotSize());
697 assertEquals(1, leader.followerLogSize());
698 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
700 assertEquals(snapshotIndex, fli.getMatchIndex());
701 assertEquals(snapshotIndex, fli.getMatchIndex());
702 assertEquals(snapshotIndex + 1, fli.getNextIndex());
706 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
707 logStart("testSendSnapshotfromInstallSnapshotReply");
709 MockRaftActorContext actorContext = createActorContextWithFollower();
711 final int followersLastIndex = 2;
712 final int snapshotIndex = 3;
713 final int snapshotTerm = 1;
714 final int currentTerm = 2;
716 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
718 public int getSnapshotChunkSize() {
722 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
723 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
725 actorContext.setConfigParams(configParams);
726 actorContext.setCommitIndex(followersLastIndex);
728 leader = new Leader(actorContext);
730 Map<String, String> leadersSnapshot = new HashMap<>();
731 leadersSnapshot.put("1", "A");
732 leadersSnapshot.put("2", "B");
733 leadersSnapshot.put("3", "C");
735 // set the snapshot variables in replicatedlog
736 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740 ByteString bs = toByteString(leadersSnapshot);
741 leader.setSnapshot(Optional.of(bs));
743 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
745 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
746 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
748 assertEquals(1, installSnapshot.getChunkIndex());
749 assertEquals(3, installSnapshot.getTotalChunks());
751 followerActor.underlyingActor().clear();
752 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
753 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
755 installSnapshot = MessageCollectorActor.expectFirstMatching(
756 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
758 assertEquals(2, installSnapshot.getChunkIndex());
759 assertEquals(3, installSnapshot.getTotalChunks());
761 followerActor.underlyingActor().clear();
762 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
763 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
765 installSnapshot = MessageCollectorActor.expectFirstMatching(
766 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
768 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
769 followerActor.underlyingActor().clear();
770 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
771 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
773 installSnapshot = MessageCollectorActor.getFirstMatching(
774 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
776 Assert.assertNull(installSnapshot);
781 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
782 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
784 MockRaftActorContext actorContext = createActorContextWithFollower();
786 final int followersLastIndex = 2;
787 final int snapshotIndex = 3;
788 final int snapshotTerm = 1;
789 final int currentTerm = 2;
791 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
793 public int getSnapshotChunkSize() {
798 actorContext.setCommitIndex(followersLastIndex);
800 leader = new Leader(actorContext);
802 Map<String, String> leadersSnapshot = new HashMap<>();
803 leadersSnapshot.put("1", "A");
804 leadersSnapshot.put("2", "B");
805 leadersSnapshot.put("3", "C");
807 // set the snapshot variables in replicatedlog
808 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
809 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
810 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
812 ByteString bs = toByteString(leadersSnapshot);
813 leader.setSnapshot(Optional.of(bs));
815 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
817 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
818 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
820 assertEquals(1, installSnapshot.getChunkIndex());
821 assertEquals(3, installSnapshot.getTotalChunks());
823 followerActor.underlyingActor().clear();
825 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
826 FOLLOWER_ID, -1, false));
828 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
829 TimeUnit.MILLISECONDS);
831 leader.handleMessage(leaderActor, new SendHeartBeat());
833 installSnapshot = MessageCollectorActor.expectFirstMatching(
834 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
836 assertEquals(1, installSnapshot.getChunkIndex());
837 assertEquals(3, installSnapshot.getTotalChunks());
841 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
842 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
844 MockRaftActorContext actorContext = createActorContextWithFollower();
846 final int followersLastIndex = 2;
847 final int snapshotIndex = 3;
848 final int snapshotTerm = 1;
849 final int currentTerm = 2;
851 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
853 public int getSnapshotChunkSize() {
858 actorContext.setCommitIndex(followersLastIndex);
860 leader = new Leader(actorContext);
862 Map<String, String> leadersSnapshot = new HashMap<>();
863 leadersSnapshot.put("1", "A");
864 leadersSnapshot.put("2", "B");
865 leadersSnapshot.put("3", "C");
867 // set the snapshot variables in replicatedlog
868 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
869 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
870 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
872 ByteString bs = toByteString(leadersSnapshot);
873 leader.setSnapshot(Optional.of(bs));
875 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
877 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
878 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
880 assertEquals(1, installSnapshot.getChunkIndex());
881 assertEquals(3, installSnapshot.getTotalChunks());
882 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
884 int hashCode = installSnapshot.getData().hashCode();
886 followerActor.underlyingActor().clear();
888 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
889 FOLLOWER_ID, 1, true));
891 installSnapshot = MessageCollectorActor.expectFirstMatching(
892 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
894 assertEquals(2, installSnapshot.getChunkIndex());
895 assertEquals(3, installSnapshot.getTotalChunks());
896 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
900 public void testFollowerToSnapshotLogic() {
901 logStart("testFollowerToSnapshotLogic");
903 MockRaftActorContext actorContext = createActorContext();
905 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
907 public int getSnapshotChunkSize() {
912 leader = new Leader(actorContext);
914 Map<String, String> leadersSnapshot = new HashMap<>();
915 leadersSnapshot.put("1", "A");
916 leadersSnapshot.put("2", "B");
917 leadersSnapshot.put("3", "C");
919 ByteString bs = toByteString(leadersSnapshot);
920 byte[] barray = bs.toByteArray();
922 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
923 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
925 assertEquals(bs.size(), barray.length);
928 for (int i=0; i < barray.length; i = i + 50) {
932 if (i + 50 > barray.length) {
936 ByteString chunk = fts.getNextChunk();
937 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
938 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
940 fts.markSendStatus(true);
941 if (!fts.isLastChunk(chunkIndex)) {
942 fts.incrementChunkIndex();
946 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
949 @Override protected RaftActorBehavior createBehavior(
950 RaftActorContext actorContext) {
951 return new Leader(actorContext);
955 protected MockRaftActorContext createActorContext() {
956 return createActorContext(leaderActor);
960 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
961 return createActorContext("leader", actorRef);
964 private MockRaftActorContext createActorContextWithFollower() {
965 MockRaftActorContext actorContext = createActorContext();
966 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
967 followerActor.path().toString()).build());
971 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
972 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
973 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
974 configParams.setElectionTimeoutFactor(100000);
975 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
976 context.setConfigParams(configParams);
981 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
982 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
984 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
986 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
988 Follower follower = new Follower(followerActorContext);
989 followerActor.underlyingActor().setBehavior(follower);
991 Map<String, String> peerAddresses = new HashMap<>();
992 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
994 leaderActorContext.setPeerAddresses(peerAddresses);
996 leaderActorContext.getReplicatedLog().removeFrom(0);
999 leaderActorContext.setReplicatedLog(
1000 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1002 leaderActorContext.setCommitIndex(1);
1004 followerActorContext.getReplicatedLog().removeFrom(0);
1006 // follower too has the exact same log entries and has the same commit index
1007 followerActorContext.setReplicatedLog(
1008 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1010 followerActorContext.setCommitIndex(1);
1012 leader = new Leader(leaderActorContext);
1014 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1016 assertEquals(1, appendEntries.getLeaderCommit());
1017 assertEquals(0, appendEntries.getEntries().size());
1018 assertEquals(0, appendEntries.getPrevLogIndex());
1020 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1021 leaderActor, AppendEntriesReply.class);
1023 assertEquals(2, appendEntriesReply.getLogLastIndex());
1024 assertEquals(1, appendEntriesReply.getLogLastTerm());
1026 // follower returns its next index
1027 assertEquals(2, appendEntriesReply.getLogLastIndex());
1028 assertEquals(1, appendEntriesReply.getLogLastTerm());
1034 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1035 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1037 MockRaftActorContext leaderActorContext = createActorContext();
1039 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1041 Follower follower = new Follower(followerActorContext);
1042 followerActor.underlyingActor().setBehavior(follower);
1044 Map<String, String> peerAddresses = new HashMap<>();
1045 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1047 leaderActorContext.setPeerAddresses(peerAddresses);
1049 leaderActorContext.getReplicatedLog().removeFrom(0);
1051 leaderActorContext.setReplicatedLog(
1052 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1054 leaderActorContext.setCommitIndex(1);
1056 followerActorContext.getReplicatedLog().removeFrom(0);
1058 followerActorContext.setReplicatedLog(
1059 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1061 // follower has the same log entries but its commit index > leaders commit index
1062 followerActorContext.setCommitIndex(2);
1064 leader = new Leader(leaderActorContext);
1066 // Initial heartbeat
1067 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1069 assertEquals(1, appendEntries.getLeaderCommit());
1070 assertEquals(0, appendEntries.getEntries().size());
1071 assertEquals(0, appendEntries.getPrevLogIndex());
1073 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1074 leaderActor, AppendEntriesReply.class);
1076 assertEquals(2, appendEntriesReply.getLogLastIndex());
1077 assertEquals(1, appendEntriesReply.getLogLastTerm());
1079 leaderActor.underlyingActor().setBehavior(follower);
1080 leader.handleMessage(followerActor, appendEntriesReply);
1082 leaderActor.underlyingActor().clear();
1083 followerActor.underlyingActor().clear();
1085 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1086 TimeUnit.MILLISECONDS);
1088 leader.handleMessage(leaderActor, new SendHeartBeat());
1090 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1092 assertEquals(2, appendEntries.getLeaderCommit());
1093 assertEquals(0, appendEntries.getEntries().size());
1094 assertEquals(2, appendEntries.getPrevLogIndex());
1096 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1098 assertEquals(2, appendEntriesReply.getLogLastIndex());
1099 assertEquals(1, appendEntriesReply.getLogLastTerm());
1101 assertEquals(2, followerActorContext.getCommitIndex());
1107 public void testHandleAppendEntriesReplyFailure(){
1108 logStart("testHandleAppendEntriesReplyFailure");
1110 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1112 leader = new Leader(leaderActorContext);
1114 // Send initial heartbeat reply with last index.
1115 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1117 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1118 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1120 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1122 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1124 assertEquals(RaftState.Leader, raftActorBehavior.state());
1126 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1130 public void testHandleAppendEntriesReplySuccess() throws Exception {
1131 logStart("testHandleAppendEntriesReplySuccess");
1133 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1135 leaderActorContext.setReplicatedLog(
1136 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1138 leaderActorContext.setCommitIndex(1);
1139 leaderActorContext.setLastApplied(1);
1140 leaderActorContext.getTermInformation().update(1, "leader");
1142 leader = new Leader(leaderActorContext);
1144 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1146 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1148 assertEquals(RaftState.Leader, raftActorBehavior.state());
1150 assertEquals(2, leaderActorContext.getCommitIndex());
1152 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1153 leaderActor, ApplyJournalEntries.class);
1155 assertEquals(2, leaderActorContext.getLastApplied());
1157 assertEquals(2, applyJournalEntries.getToIndex());
1159 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1162 assertEquals(1,applyStateList.size());
1164 ApplyState applyState = applyStateList.get(0);
1166 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1170 public void testHandleAppendEntriesReplyUnknownFollower(){
1171 logStart("testHandleAppendEntriesReplyUnknownFollower");
1173 MockRaftActorContext leaderActorContext = createActorContext();
1175 leader = new Leader(leaderActorContext);
1177 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1179 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1181 assertEquals(RaftState.Leader, raftActorBehavior.state());
1185 public void testHandleRequestVoteReply(){
1186 logStart("testHandleRequestVoteReply");
1188 MockRaftActorContext leaderActorContext = createActorContext();
1190 leader = new Leader(leaderActorContext);
1192 // Should be a no-op.
1193 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1194 new RequestVoteReply(1, true));
1196 assertEquals(RaftState.Leader, raftActorBehavior.state());
1198 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1200 assertEquals(RaftState.Leader, raftActorBehavior.state());
1204 public void testIsolatedLeaderCheckNoFollowers() {
1205 logStart("testIsolatedLeaderCheckNoFollowers");
1207 MockRaftActorContext leaderActorContext = createActorContext();
1209 leader = new Leader(leaderActorContext);
1210 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1211 Assert.assertTrue(behavior instanceof Leader);
1215 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1216 logStart("testIsolatedLeaderCheckTwoFollowers");
1218 new JavaTestKit(getSystem()) {{
1220 ActorRef followerActor1 = getTestActor();
1221 ActorRef followerActor2 = getTestActor();
1223 MockRaftActorContext leaderActorContext = createActorContext();
1225 Map<String, String> peerAddresses = new HashMap<>();
1226 peerAddresses.put("follower-1", followerActor1.path().toString());
1227 peerAddresses.put("follower-2", followerActor2.path().toString());
1229 leaderActorContext.setPeerAddresses(peerAddresses);
1231 leader = new Leader(leaderActorContext);
1233 leader.markFollowerActive("follower-1");
1234 leader.markFollowerActive("follower-2");
1235 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1236 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1237 behavior instanceof Leader);
1239 // kill 1 follower and verify if that got killed
1240 final JavaTestKit probe = new JavaTestKit(getSystem());
1241 probe.watch(followerActor1);
1242 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1243 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1244 assertEquals(termMsg1.getActor(), followerActor1);
1246 leader.markFollowerInActive("follower-1");
1247 leader.markFollowerActive("follower-2");
1248 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1249 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1250 behavior instanceof Leader);
1252 // kill 2nd follower and leader should change to Isolated leader
1253 followerActor2.tell(PoisonPill.getInstance(), null);
1254 probe.watch(followerActor2);
1255 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1256 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1257 assertEquals(termMsg2.getActor(), followerActor2);
1259 leader.markFollowerInActive("follower-2");
1260 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1261 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1262 behavior instanceof IsolatedLeader);
1268 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1269 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1271 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1273 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1274 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1275 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1277 leaderActorContext.setConfigParams(configParams);
1279 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1281 followerActorContext.setConfigParams(configParams);
1283 Follower follower = new Follower(followerActorContext);
1284 followerActor.underlyingActor().setBehavior(follower);
1286 leaderActorContext.getReplicatedLog().removeFrom(0);
1287 leaderActorContext.setCommitIndex(-1);
1288 leaderActorContext.setLastApplied(-1);
1290 followerActorContext.getReplicatedLog().removeFrom(0);
1291 followerActorContext.setCommitIndex(-1);
1292 followerActorContext.setLastApplied(-1);
1294 leader = new Leader(leaderActorContext);
1296 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1297 leaderActor, AppendEntriesReply.class);
1299 leader.handleMessage(followerActor, appendEntriesReply);
1301 // Clear initial heartbeat messages
1303 leaderActor.underlyingActor().clear();
1304 followerActor.underlyingActor().clear();
1307 leaderActorContext.setReplicatedLog(
1308 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309 leaderActorContext.setCommitIndex(1);
1310 leaderActorContext.setLastApplied(1);
1312 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1313 TimeUnit.MILLISECONDS);
1315 leader.handleMessage(leaderActor, new SendHeartBeat());
1317 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1319 // Should send first log entry
1320 assertEquals(1, appendEntries.getLeaderCommit());
1321 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1322 assertEquals(-1, appendEntries.getPrevLogIndex());
1324 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1326 assertEquals(1, appendEntriesReply.getLogLastTerm());
1327 assertEquals(0, appendEntriesReply.getLogLastIndex());
1329 followerActor.underlyingActor().clear();
1331 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1333 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1335 // Should send second log entry
1336 assertEquals(1, appendEntries.getLeaderCommit());
1337 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1343 public void testLaggingFollowerStarvation() throws Exception {
1344 logStart("testLaggingFollowerStarvation");
1345 new JavaTestKit(getSystem()) {{
1346 String leaderActorId = actorFactory.generateActorId("leader");
1347 String follower1ActorId = actorFactory.generateActorId("follower");
1348 String follower2ActorId = actorFactory.generateActorId("follower");
1350 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1351 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1352 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1353 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1355 MockRaftActorContext leaderActorContext =
1356 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1358 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1359 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1360 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1362 leaderActorContext.setConfigParams(configParams);
1364 leaderActorContext.setReplicatedLog(
1365 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1367 Map<String, String> peerAddresses = new HashMap<>();
1368 peerAddresses.put(follower1ActorId,
1369 follower1Actor.path().toString());
1370 peerAddresses.put(follower2ActorId,
1371 follower2Actor.path().toString());
1373 leaderActorContext.setPeerAddresses(peerAddresses);
1374 leaderActorContext.getTermInformation().update(1, leaderActorId);
1376 RaftActorBehavior leader = createBehavior(leaderActorContext);
1378 leaderActor.underlyingActor().setBehavior(leader);
1380 for(int i=1;i<6;i++) {
1381 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1382 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1383 assertTrue(newBehavior == leader);
1384 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1387 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1388 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1390 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1391 heartbeats.size() > 1);
1393 // Check if follower-2 got AppendEntries during this time and was not starved
1394 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1396 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1397 appendEntries.size() > 1);
1403 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1404 ActorRef actorRef, RaftRPC rpc) throws Exception {
1405 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1406 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1409 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1411 private final long electionTimeOutIntervalMillis;
1412 private final int snapshotChunkSize;
1414 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1416 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1417 this.snapshotChunkSize = snapshotChunkSize;
1421 public FiniteDuration getElectionTimeOutInterval() {
1422 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1426 public int getSnapshotChunkSize() {
1427 return snapshotChunkSize;