1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractLeaderTest {
51 static final String FOLLOWER_ID = "follower";
52 public static final String LEADER_ID = "leader";
54 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
57 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
60 private Leader leader;
64 public void tearDown() throws Exception {
73 public void testHandleMessageForUnknownMessage() throws Exception {
74 logStart("testHandleMessageForUnknownMessage");
76 leader = new Leader(createActorContext());
78 // handle message should return the Leader state when it receives an
80 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
81 Assert.assertTrue(behavior instanceof Leader);
85 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
86 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
88 MockRaftActorContext actorContext = createActorContextWithFollower();
89 short payloadVersion = (short)5;
90 actorContext.setPayloadVersion(payloadVersion);
93 actorContext.getTermInformation().update(term, "");
95 leader = new Leader(actorContext);
97 // Leader should send an immediate heartbeat with no entries as follower is inactive.
98 long lastIndex = actorContext.getReplicatedLog().lastIndex();
99 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
100 assertEquals("getTerm", term, appendEntries.getTerm());
101 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
102 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
103 assertEquals("Entries size", 0, appendEntries.getEntries().size());
104 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
106 // The follower would normally reply - simulate that explicitly here.
107 leader.handleMessage(followerActor, new AppendEntriesReply(
108 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
109 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
111 followerActor.underlyingActor().clear();
113 // Sleep for the heartbeat interval so AppendEntries is sent.
114 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
115 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
117 leader.handleMessage(leaderActor, new SendHeartBeat());
119 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
121 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
122 assertEquals("Entries size", 1, appendEntries.getEntries().size());
123 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
124 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
125 assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
129 private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
130 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
131 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
133 actorContext.getReplicatedLog().append(newEntry);
134 return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
138 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
139 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
141 MockRaftActorContext actorContext = createActorContextWithFollower();
144 actorContext.getTermInformation().update(term, "");
146 leader = new Leader(actorContext);
148 // Leader will send an immediate heartbeat - ignore it.
149 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151 // The follower would normally reply - simulate that explicitly here.
152 long lastIndex = actorContext.getReplicatedLog().lastIndex();
153 leader.handleMessage(followerActor, new AppendEntriesReply(
154 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
155 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
157 followerActor.underlyingActor().clear();
159 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
160 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
161 1, lastIndex + 1, payload);
162 actorContext.getReplicatedLog().append(newEntry);
163 RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
165 // State should not change
166 assertTrue(raftBehavior instanceof Leader);
168 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
169 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
170 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
171 assertEquals("Entries size", 1, appendEntries.getEntries().size());
172 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
173 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
174 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
178 public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
179 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
181 MockRaftActorContext actorContext = createActorContextWithFollower();
182 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
184 public FiniteDuration getHeartBeatInterval() {
185 return FiniteDuration.apply(5, TimeUnit.SECONDS);
190 actorContext.getTermInformation().update(term, "");
192 leader = new Leader(actorContext);
194 // Leader will send an immediate heartbeat - ignore it.
195 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
197 // The follower would normally reply - simulate that explicitly here.
198 long lastIndex = actorContext.getReplicatedLog().lastIndex();
199 leader.handleMessage(followerActor, new AppendEntriesReply(
200 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
201 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
203 followerActor.underlyingActor().clear();
205 for(int i=0;i<5;i++) {
206 sendReplicate(actorContext, lastIndex+i+1);
209 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
210 // We expect only 1 message to be sent because of two reasons,
211 // - an append entries reply was not received
212 // - the heartbeat interval has not expired
213 // In this scenario if multiple messages are sent they would likely be duplicates
214 assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
218 public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
219 logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
221 MockRaftActorContext actorContext = createActorContextWithFollower();
222 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
224 public FiniteDuration getHeartBeatInterval() {
225 return FiniteDuration.apply(5, TimeUnit.SECONDS);
230 actorContext.getTermInformation().update(term, "");
232 leader = new Leader(actorContext);
234 // Leader will send an immediate heartbeat - ignore it.
235 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237 // The follower would normally reply - simulate that explicitly here.
238 long lastIndex = actorContext.getReplicatedLog().lastIndex();
239 leader.handleMessage(followerActor, new AppendEntriesReply(
240 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
241 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
243 followerActor.underlyingActor().clear();
245 for(int i=0;i<3;i++) {
246 sendReplicate(actorContext, lastIndex+i+1);
247 leader.handleMessage(followerActor, new AppendEntriesReply(
248 FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
252 for(int i=3;i<5;i++) {
253 sendReplicate(actorContext, lastIndex + i + 1);
256 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257 // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
258 // get sent to the follower - but not the 5th
259 assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
261 for(int i=0;i<4;i++) {
262 long expected = allMessages.get(i).getEntries().get(0).getIndex();
263 assertEquals(expected, i+2);
268 public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
269 logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
271 MockRaftActorContext actorContext = createActorContextWithFollower();
272 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
274 public FiniteDuration getHeartBeatInterval() {
275 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
280 actorContext.getTermInformation().update(term, "");
282 leader = new Leader(actorContext);
284 // Leader will send an immediate heartbeat - ignore it.
285 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
287 // The follower would normally reply - simulate that explicitly here.
288 long lastIndex = actorContext.getReplicatedLog().lastIndex();
289 leader.handleMessage(followerActor, new AppendEntriesReply(
290 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
291 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
293 followerActor.underlyingActor().clear();
295 sendReplicate(actorContext, lastIndex+1);
297 // Wait slightly longer than heartbeat duration
298 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
300 leader.handleMessage(leaderActor, new SendHeartBeat());
302 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
303 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
305 assertEquals(1, allMessages.get(0).getEntries().size());
306 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
307 assertEquals(1, allMessages.get(1).getEntries().size());
308 assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
313 public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
314 logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
316 MockRaftActorContext actorContext = createActorContextWithFollower();
317 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
319 public FiniteDuration getHeartBeatInterval() {
320 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
325 actorContext.getTermInformation().update(term, "");
327 leader = new Leader(actorContext);
329 // Leader will send an immediate heartbeat - ignore it.
330 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
332 // The follower would normally reply - simulate that explicitly here.
333 long lastIndex = actorContext.getReplicatedLog().lastIndex();
334 leader.handleMessage(followerActor, new AppendEntriesReply(
335 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
336 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
338 followerActor.underlyingActor().clear();
340 for(int i=0;i<3;i++) {
341 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
342 leader.handleMessage(leaderActor, new SendHeartBeat());
345 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
346 assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
350 public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
351 logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
353 MockRaftActorContext actorContext = createActorContextWithFollower();
354 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
356 public FiniteDuration getHeartBeatInterval() {
357 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
362 actorContext.getTermInformation().update(term, "");
364 leader = new Leader(actorContext);
366 // Leader will send an immediate heartbeat - ignore it.
367 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
369 // The follower would normally reply - simulate that explicitly here.
370 long lastIndex = actorContext.getReplicatedLog().lastIndex();
371 leader.handleMessage(followerActor, new AppendEntriesReply(
372 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
375 followerActor.underlyingActor().clear();
377 Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
378 leader.handleMessage(leaderActor, new SendHeartBeat());
379 sendReplicate(actorContext, lastIndex+1);
381 List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
382 assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
384 assertEquals(0, allMessages.get(0).getEntries().size());
385 assertEquals(1, allMessages.get(1).getEntries().size());
390 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
391 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
393 MockRaftActorContext actorContext = createActorContext();
395 leader = new Leader(actorContext);
397 actorContext.setLastApplied(0);
399 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
400 long term = actorContext.getTermInformation().getCurrentTerm();
401 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
402 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
404 actorContext.getReplicatedLog().append(newEntry);
406 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
407 new Replicate(leaderActor, "state-id", newEntry));
409 // State should not change
410 assertTrue(raftBehavior instanceof Leader);
412 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
414 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
415 // one since lastApplied state is 0.
416 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
417 leaderActor, ApplyState.class);
418 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
420 for(int i = 0; i <= newLogIndex - 1; i++ ) {
421 ApplyState applyState = applyStateList.get(i);
422 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
423 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
426 ApplyState last = applyStateList.get((int) newLogIndex - 1);
427 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
428 assertEquals("getIdentifier", "state-id", last.getIdentifier());
432 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
433 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
435 MockRaftActorContext actorContext = createActorContextWithFollower();
437 Map<String, String> leadersSnapshot = new HashMap<>();
438 leadersSnapshot.put("1", "A");
439 leadersSnapshot.put("2", "B");
440 leadersSnapshot.put("3", "C");
443 actorContext.getReplicatedLog().removeFrom(0);
445 final int followersLastIndex = 2;
446 final int snapshotIndex = 3;
447 final int newEntryIndex = 4;
448 final int snapshotTerm = 1;
449 final int currentTerm = 2;
451 // set the snapshot variables in replicatedlog
452 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
453 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
454 actorContext.setCommitIndex(followersLastIndex);
455 //set follower timeout to 2 mins, helps during debugging
456 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
458 leader = new Leader(actorContext);
461 ReplicatedLogImplEntry entry =
462 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
463 new MockRaftActorContext.MockPayload("D"));
465 //update follower timestamp
466 leader.markFollowerActive(FOLLOWER_ID);
468 ByteString bs = toByteString(leadersSnapshot);
469 leader.setSnapshot(Optional.of(bs));
470 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
471 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
473 //send first chunk and no InstallSnapshotReply received yet
475 fts.incrementChunkIndex();
477 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
478 TimeUnit.MILLISECONDS);
480 leader.handleMessage(leaderActor, new SendHeartBeat());
482 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
486 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
488 //InstallSnapshotReply received
489 fts.markSendStatus(true);
491 leader.handleMessage(leaderActor, new SendHeartBeat());
493 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
495 assertEquals(snapshotIndex, is.getLastIncludedIndex());
499 public void testSendAppendEntriesSnapshotScenario() throws Exception {
500 logStart("testSendAppendEntriesSnapshotScenario");
502 MockRaftActorContext actorContext = createActorContextWithFollower();
504 Map<String, String> leadersSnapshot = new HashMap<>();
505 leadersSnapshot.put("1", "A");
506 leadersSnapshot.put("2", "B");
507 leadersSnapshot.put("3", "C");
510 actorContext.getReplicatedLog().removeFrom(0);
512 final int followersLastIndex = 2;
513 final int snapshotIndex = 3;
514 final int newEntryIndex = 4;
515 final int snapshotTerm = 1;
516 final int currentTerm = 2;
518 // set the snapshot variables in replicatedlog
519 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
520 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
521 actorContext.setCommitIndex(followersLastIndex);
523 leader = new Leader(actorContext);
525 // Leader will send an immediate heartbeat - ignore it.
526 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
529 ReplicatedLogImplEntry entry =
530 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
531 new MockRaftActorContext.MockPayload("D"));
533 actorContext.getReplicatedLog().append(entry);
535 //update follower timestamp
536 leader.markFollowerActive(FOLLOWER_ID);
538 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
539 RaftActorBehavior raftBehavior = leader.handleMessage(
540 leaderActor, new Replicate(null, "state-id", entry));
542 assertTrue(raftBehavior instanceof Leader);
544 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
548 public void testInitiateInstallSnapshot() throws Exception {
549 logStart("testInitiateInstallSnapshot");
551 MockRaftActorContext actorContext = createActorContextWithFollower();
553 Map<String, String> leadersSnapshot = new HashMap<>();
554 leadersSnapshot.put("1", "A");
555 leadersSnapshot.put("2", "B");
556 leadersSnapshot.put("3", "C");
559 actorContext.getReplicatedLog().removeFrom(0);
561 final int followersLastIndex = 2;
562 final int snapshotIndex = 3;
563 final int newEntryIndex = 4;
564 final int snapshotTerm = 1;
565 final int currentTerm = 2;
567 // set the snapshot variables in replicatedlog
568 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570 actorContext.setLastApplied(3);
571 actorContext.setCommitIndex(followersLastIndex);
573 leader = new Leader(actorContext);
575 // Leader will send an immediate heartbeat - ignore it.
576 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
578 // set the snapshot as absent and check if capture-snapshot is invoked.
579 leader.setSnapshot(Optional.<ByteString>absent());
582 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
583 new MockRaftActorContext.MockPayload("D"));
585 actorContext.getReplicatedLog().append(entry);
587 //update follower timestamp
588 leader.markFollowerActive(FOLLOWER_ID);
590 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
592 assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
594 CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
596 assertTrue(cs.isInstallSnapshotInitiated());
597 assertEquals(3, cs.getLastAppliedIndex());
598 assertEquals(1, cs.getLastAppliedTerm());
599 assertEquals(4, cs.getLastIndex());
600 assertEquals(2, cs.getLastTerm());
602 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
603 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
605 Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
609 public void testInstallSnapshot() throws Exception {
610 logStart("testInstallSnapshot");
612 MockRaftActorContext actorContext = createActorContextWithFollower();
614 Map<String, String> leadersSnapshot = new HashMap<>();
615 leadersSnapshot.put("1", "A");
616 leadersSnapshot.put("2", "B");
617 leadersSnapshot.put("3", "C");
620 actorContext.getReplicatedLog().removeFrom(0);
622 final int followersLastIndex = 2;
623 final int snapshotIndex = 3;
624 final int snapshotTerm = 1;
625 final int currentTerm = 2;
627 // set the snapshot variables in replicatedlog
628 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
629 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
630 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
631 actorContext.setCommitIndex(followersLastIndex);
633 leader = new Leader(actorContext);
635 // Ignore initial heartbeat.
636 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
638 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
639 new SendInstallSnapshot(toByteString(leadersSnapshot)));
641 assertTrue(raftBehavior instanceof Leader);
643 // check if installsnapshot gets called with the correct values.
645 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
647 assertNotNull(installSnapshot.getData());
648 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
649 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
651 assertEquals(currentTerm, installSnapshot.getTerm());
655 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
656 logStart("testHandleInstallSnapshotReplyLastChunk");
658 MockRaftActorContext actorContext = createActorContextWithFollower();
660 final int followersLastIndex = 2;
661 final int snapshotIndex = 3;
662 final int snapshotTerm = 1;
663 final int currentTerm = 2;
665 actorContext.setCommitIndex(followersLastIndex);
667 leader = new Leader(actorContext);
669 // Ignore initial heartbeat.
670 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
672 Map<String, String> leadersSnapshot = new HashMap<>();
673 leadersSnapshot.put("1", "A");
674 leadersSnapshot.put("2", "B");
675 leadersSnapshot.put("3", "C");
677 // set the snapshot variables in replicatedlog
679 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
683 ByteString bs = toByteString(leadersSnapshot);
684 leader.setSnapshot(Optional.of(bs));
685 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
686 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
687 while(!fts.isLastChunk(fts.getChunkIndex())) {
689 fts.incrementChunkIndex();
693 actorContext.getReplicatedLog().removeFrom(0);
695 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
696 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
698 assertTrue(raftBehavior instanceof Leader);
700 assertEquals(0, leader.followerSnapshotSize());
701 assertEquals(1, leader.followerLogSize());
702 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
704 assertEquals(snapshotIndex, fli.getMatchIndex());
705 assertEquals(snapshotIndex, fli.getMatchIndex());
706 assertEquals(snapshotIndex + 1, fli.getNextIndex());
710 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
711 logStart("testSendSnapshotfromInstallSnapshotReply");
713 MockRaftActorContext actorContext = createActorContextWithFollower();
715 final int followersLastIndex = 2;
716 final int snapshotIndex = 3;
717 final int snapshotTerm = 1;
718 final int currentTerm = 2;
720 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
722 public int getSnapshotChunkSize() {
726 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
727 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
729 actorContext.setConfigParams(configParams);
730 actorContext.setCommitIndex(followersLastIndex);
732 leader = new Leader(actorContext);
734 Map<String, String> leadersSnapshot = new HashMap<>();
735 leadersSnapshot.put("1", "A");
736 leadersSnapshot.put("2", "B");
737 leadersSnapshot.put("3", "C");
739 // set the snapshot variables in replicatedlog
740 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
741 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
742 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
744 ByteString bs = toByteString(leadersSnapshot);
745 leader.setSnapshot(Optional.of(bs));
747 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
749 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
751 assertEquals(1, installSnapshot.getChunkIndex());
752 assertEquals(3, installSnapshot.getTotalChunks());
754 followerActor.underlyingActor().clear();
755 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
756 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
758 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
760 assertEquals(2, installSnapshot.getChunkIndex());
761 assertEquals(3, installSnapshot.getTotalChunks());
763 followerActor.underlyingActor().clear();
764 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
765 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
767 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
769 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
770 followerActor.underlyingActor().clear();
771 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
772 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
774 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
776 Assert.assertNull(installSnapshot);
781 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
782 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
784 MockRaftActorContext actorContext = createActorContextWithFollower();
786 final int followersLastIndex = 2;
787 final int snapshotIndex = 3;
788 final int snapshotTerm = 1;
789 final int currentTerm = 2;
791 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
793 public int getSnapshotChunkSize() {
798 actorContext.setCommitIndex(followersLastIndex);
800 leader = new Leader(actorContext);
802 Map<String, String> leadersSnapshot = new HashMap<>();
803 leadersSnapshot.put("1", "A");
804 leadersSnapshot.put("2", "B");
805 leadersSnapshot.put("3", "C");
807 // set the snapshot variables in replicatedlog
808 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
809 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
810 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
812 ByteString bs = toByteString(leadersSnapshot);
813 leader.setSnapshot(Optional.of(bs));
815 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
816 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
818 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
820 assertEquals(1, installSnapshot.getChunkIndex());
821 assertEquals(3, installSnapshot.getTotalChunks());
823 followerActor.underlyingActor().clear();
825 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
826 FOLLOWER_ID, -1, false));
828 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
829 TimeUnit.MILLISECONDS);
831 leader.handleMessage(leaderActor, new SendHeartBeat());
833 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
835 assertEquals(1, installSnapshot.getChunkIndex());
836 assertEquals(3, installSnapshot.getTotalChunks());
840 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
841 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
843 MockRaftActorContext actorContext = createActorContextWithFollower();
845 final int followersLastIndex = 2;
846 final int snapshotIndex = 3;
847 final int snapshotTerm = 1;
848 final int currentTerm = 2;
850 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
852 public int getSnapshotChunkSize() {
857 actorContext.setCommitIndex(followersLastIndex);
859 leader = new Leader(actorContext);
861 Map<String, String> leadersSnapshot = new HashMap<>();
862 leadersSnapshot.put("1", "A");
863 leadersSnapshot.put("2", "B");
864 leadersSnapshot.put("3", "C");
866 // set the snapshot variables in replicatedlog
867 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
868 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
869 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
871 ByteString bs = toByteString(leadersSnapshot);
872 leader.setSnapshot(Optional.of(bs));
874 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
876 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
878 assertEquals(1, installSnapshot.getChunkIndex());
879 assertEquals(3, installSnapshot.getTotalChunks());
880 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
882 int hashCode = installSnapshot.getData().hashCode();
884 followerActor.underlyingActor().clear();
886 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
887 FOLLOWER_ID, 1, true));
889 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
891 assertEquals(2, installSnapshot.getChunkIndex());
892 assertEquals(3, installSnapshot.getTotalChunks());
893 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
897 public void testFollowerToSnapshotLogic() {
898 logStart("testFollowerToSnapshotLogic");
900 MockRaftActorContext actorContext = createActorContext();
902 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
904 public int getSnapshotChunkSize() {
909 leader = new Leader(actorContext);
911 Map<String, String> leadersSnapshot = new HashMap<>();
912 leadersSnapshot.put("1", "A");
913 leadersSnapshot.put("2", "B");
914 leadersSnapshot.put("3", "C");
916 ByteString bs = toByteString(leadersSnapshot);
917 byte[] barray = bs.toByteArray();
919 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
920 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
922 assertEquals(bs.size(), barray.length);
925 for (int i=0; i < barray.length; i = i + 50) {
929 if (i + 50 > barray.length) {
933 ByteString chunk = fts.getNextChunk();
934 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
935 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
937 fts.markSendStatus(true);
938 if (!fts.isLastChunk(chunkIndex)) {
939 fts.incrementChunkIndex();
943 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
946 @Override protected RaftActorBehavior createBehavior(
947 RaftActorContext actorContext) {
948 return new Leader(actorContext);
952 protected MockRaftActorContext createActorContext() {
953 return createActorContext(leaderActor);
957 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
958 return createActorContext(LEADER_ID, actorRef);
961 private MockRaftActorContext createActorContextWithFollower() {
962 MockRaftActorContext actorContext = createActorContext();
963 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
964 followerActor.path().toString()).build());
968 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
969 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
970 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
971 configParams.setElectionTimeoutFactor(100000);
972 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
973 context.setConfigParams(configParams);
977 private MockRaftActorContext createFollowerActorContextWithLeader() {
978 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
979 DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
980 followerConfig.setElectionTimeoutFactor(10000);
981 followerActorContext.setConfigParams(followerConfig);
982 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
983 return followerActorContext;
987 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
988 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
990 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
992 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
994 Follower follower = new Follower(followerActorContext);
995 followerActor.underlyingActor().setBehavior(follower);
997 Map<String, String> peerAddresses = new HashMap<>();
998 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1000 leaderActorContext.setPeerAddresses(peerAddresses);
1002 leaderActorContext.getReplicatedLog().removeFrom(0);
1005 leaderActorContext.setReplicatedLog(
1006 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1008 leaderActorContext.setCommitIndex(1);
1010 followerActorContext.getReplicatedLog().removeFrom(0);
1012 // follower too has the exact same log entries and has the same commit index
1013 followerActorContext.setReplicatedLog(
1014 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1016 followerActorContext.setCommitIndex(1);
1018 leader = new Leader(leaderActorContext);
1020 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1022 assertEquals(1, appendEntries.getLeaderCommit());
1023 assertEquals(0, appendEntries.getEntries().size());
1024 assertEquals(0, appendEntries.getPrevLogIndex());
1026 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1027 leaderActor, AppendEntriesReply.class);
1029 assertEquals(2, appendEntriesReply.getLogLastIndex());
1030 assertEquals(1, appendEntriesReply.getLogLastTerm());
1032 // follower returns its next index
1033 assertEquals(2, appendEntriesReply.getLogLastIndex());
1034 assertEquals(1, appendEntriesReply.getLogLastTerm());
1040 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1041 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1043 MockRaftActorContext leaderActorContext = createActorContext();
1045 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1046 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1048 Follower follower = new Follower(followerActorContext);
1049 followerActor.underlyingActor().setBehavior(follower);
1051 Map<String, String> leaderPeerAddresses = new HashMap<>();
1052 leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1054 leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1056 leaderActorContext.getReplicatedLog().removeFrom(0);
1058 leaderActorContext.setReplicatedLog(
1059 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1061 leaderActorContext.setCommitIndex(1);
1063 followerActorContext.getReplicatedLog().removeFrom(0);
1065 followerActorContext.setReplicatedLog(
1066 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1068 // follower has the same log entries but its commit index > leaders commit index
1069 followerActorContext.setCommitIndex(2);
1071 leader = new Leader(leaderActorContext);
1073 // Initial heartbeat
1074 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1076 assertEquals(1, appendEntries.getLeaderCommit());
1077 assertEquals(0, appendEntries.getEntries().size());
1078 assertEquals(0, appendEntries.getPrevLogIndex());
1080 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1081 leaderActor, AppendEntriesReply.class);
1083 assertEquals(2, appendEntriesReply.getLogLastIndex());
1084 assertEquals(1, appendEntriesReply.getLogLastTerm());
1086 leaderActor.underlyingActor().setBehavior(follower);
1087 leader.handleMessage(followerActor, appendEntriesReply);
1089 leaderActor.underlyingActor().clear();
1090 followerActor.underlyingActor().clear();
1092 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1093 TimeUnit.MILLISECONDS);
1095 leader.handleMessage(leaderActor, new SendHeartBeat());
1097 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1099 assertEquals(2, appendEntries.getLeaderCommit());
1100 assertEquals(0, appendEntries.getEntries().size());
1101 assertEquals(2, appendEntries.getPrevLogIndex());
1103 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1105 assertEquals(2, appendEntriesReply.getLogLastIndex());
1106 assertEquals(1, appendEntriesReply.getLogLastTerm());
1108 assertEquals(2, followerActorContext.getCommitIndex());
1114 public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1115 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1117 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1118 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1119 new FiniteDuration(1000, TimeUnit.SECONDS));
1121 leaderActorContext.setReplicatedLog(
1122 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1123 long leaderCommitIndex = 2;
1124 leaderActorContext.setCommitIndex(leaderCommitIndex);
1125 leaderActorContext.setLastApplied(leaderCommitIndex);
1127 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1129 followerActorContext.setReplicatedLog(
1130 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1131 followerActorContext.setCommitIndex(1);
1132 followerActorContext.setLastApplied(1);
1134 Follower follower = new Follower(followerActorContext);
1135 followerActor.underlyingActor().setBehavior(follower);
1137 leader = new Leader(leaderActorContext);
1139 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1140 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1142 MessageCollectorActor.clearMessages(followerActor);
1143 MessageCollectorActor.clearMessages(leaderActor);
1145 // Verify initial AppendEntries sent with the leader's current commit index.
1146 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1147 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1148 assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1150 leaderActor.underlyingActor().setBehavior(leader);
1152 leader.handleMessage(followerActor, appendEntriesReply);
1154 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1155 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1157 // Verify AppendEntries sent with the leader's second log entry.
1158 appendEntries = appendEntriesList.get(0);
1159 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1160 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1161 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1162 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1164 // Verify AppendEntries sent with the leader's third log entry.
1165 appendEntries = appendEntriesList.get(1);
1166 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1167 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1168 assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
1169 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1171 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1172 assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1174 assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1175 assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1179 public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1180 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1182 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1183 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1184 new FiniteDuration(1000, TimeUnit.SECONDS));
1186 leaderActorContext.setReplicatedLog(
1187 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1188 long leaderCommitIndex = 1;
1189 leaderActorContext.setCommitIndex(leaderCommitIndex);
1190 leaderActorContext.setLastApplied(leaderCommitIndex);
1192 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1194 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1195 followerActorContext.setCommitIndex(-1);
1196 followerActorContext.setLastApplied(-1);
1198 Follower follower = new Follower(followerActorContext);
1199 followerActor.underlyingActor().setBehavior(follower);
1201 leader = new Leader(leaderActorContext);
1203 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1204 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1206 MessageCollectorActor.clearMessages(followerActor);
1207 MessageCollectorActor.clearMessages(leaderActor);
1209 // Verify initial AppendEntries sent with the leader's current commit index.
1210 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1211 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1212 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1214 leaderActor.underlyingActor().setBehavior(leader);
1216 leader.handleMessage(followerActor, appendEntriesReply);
1218 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1219 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1221 // Verify AppendEntries sent with the leader's first log entry.
1222 appendEntries = appendEntriesList.get(0);
1223 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1224 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1225 assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
1226 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1228 // Verify AppendEntries sent with the leader's second log entry.
1229 appendEntries = appendEntriesList.get(1);
1230 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1231 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1232 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1233 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1235 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1236 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1238 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1239 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1243 public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1244 logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1246 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1247 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1248 new FiniteDuration(1000, TimeUnit.SECONDS));
1250 leaderActorContext.setReplicatedLog(
1251 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1252 long leaderCommitIndex = 1;
1253 leaderActorContext.setCommitIndex(leaderCommitIndex);
1254 leaderActorContext.setLastApplied(leaderCommitIndex);
1256 ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1257 ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1259 MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1261 followerActorContext.setReplicatedLog(
1262 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1263 followerActorContext.setCommitIndex(-1);
1264 followerActorContext.setLastApplied(-1);
1266 Follower follower = new Follower(followerActorContext);
1267 followerActor.underlyingActor().setBehavior(follower);
1269 leader = new Leader(leaderActorContext);
1271 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1272 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1274 MessageCollectorActor.clearMessages(followerActor);
1275 MessageCollectorActor.clearMessages(leaderActor);
1277 // Verify initial AppendEntries sent with the leader's current commit index.
1278 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1279 assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1280 assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1282 leaderActor.underlyingActor().setBehavior(leader);
1284 leader.handleMessage(followerActor, appendEntriesReply);
1286 MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1287 List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1289 // Verify AppendEntries sent with the leader's first log entry.
1290 appendEntries = appendEntriesList.get(0);
1291 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1292 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1293 assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
1294 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1296 // Verify AppendEntries sent with the leader's third log entry.
1297 appendEntries = appendEntriesList.get(1);
1298 assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1299 assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1300 assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1301 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1303 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1304 assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1306 List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1308 ApplyState applyState = applyStateList.get(0);
1309 assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1310 assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1311 assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1312 applyState.getReplicatedLogEntry().getData());
1314 applyState = applyStateList.get(1);
1315 assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1316 assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1317 assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1318 applyState.getReplicatedLogEntry().getData());
1320 assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1321 assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1322 assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1326 public void testHandleAppendEntriesReplySuccess() throws Exception {
1327 logStart("testHandleAppendEntriesReplySuccess");
1329 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1331 leaderActorContext.setReplicatedLog(
1332 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1334 leaderActorContext.setCommitIndex(1);
1335 leaderActorContext.setLastApplied(1);
1336 leaderActorContext.getTermInformation().update(1, "leader");
1338 leader = new Leader(leaderActorContext);
1340 short payloadVersion = 5;
1341 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1343 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1345 assertEquals(RaftState.Leader, raftActorBehavior.state());
1347 assertEquals(2, leaderActorContext.getCommitIndex());
1349 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1350 leaderActor, ApplyJournalEntries.class);
1352 assertEquals(2, leaderActorContext.getLastApplied());
1354 assertEquals(2, applyJournalEntries.getToIndex());
1356 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1359 assertEquals(1,applyStateList.size());
1361 ApplyState applyState = applyStateList.get(0);
1363 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1365 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1366 assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1370 public void testHandleAppendEntriesReplyUnknownFollower(){
1371 logStart("testHandleAppendEntriesReplyUnknownFollower");
1373 MockRaftActorContext leaderActorContext = createActorContext();
1375 leader = new Leader(leaderActorContext);
1377 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1379 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1381 assertEquals(RaftState.Leader, raftActorBehavior.state());
1385 public void testHandleRequestVoteReply(){
1386 logStart("testHandleRequestVoteReply");
1388 MockRaftActorContext leaderActorContext = createActorContext();
1390 leader = new Leader(leaderActorContext);
1392 // Should be a no-op.
1393 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1394 new RequestVoteReply(1, true));
1396 assertEquals(RaftState.Leader, raftActorBehavior.state());
1398 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1400 assertEquals(RaftState.Leader, raftActorBehavior.state());
1404 public void testIsolatedLeaderCheckNoFollowers() {
1405 logStart("testIsolatedLeaderCheckNoFollowers");
1407 MockRaftActorContext leaderActorContext = createActorContext();
1409 leader = new Leader(leaderActorContext);
1410 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1411 Assert.assertTrue(behavior instanceof Leader);
1415 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1416 logStart("testIsolatedLeaderCheckTwoFollowers");
1418 new JavaTestKit(getSystem()) {{
1420 ActorRef followerActor1 = getTestActor();
1421 ActorRef followerActor2 = getTestActor();
1423 MockRaftActorContext leaderActorContext = createActorContext();
1425 Map<String, String> peerAddresses = new HashMap<>();
1426 peerAddresses.put("follower-1", followerActor1.path().toString());
1427 peerAddresses.put("follower-2", followerActor2.path().toString());
1429 leaderActorContext.setPeerAddresses(peerAddresses);
1431 leader = new Leader(leaderActorContext);
1433 leader.markFollowerActive("follower-1");
1434 leader.markFollowerActive("follower-2");
1435 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1436 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1437 behavior instanceof Leader);
1439 // kill 1 follower and verify if that got killed
1440 final JavaTestKit probe = new JavaTestKit(getSystem());
1441 probe.watch(followerActor1);
1442 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1443 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1444 assertEquals(termMsg1.getActor(), followerActor1);
1446 leader.markFollowerInActive("follower-1");
1447 leader.markFollowerActive("follower-2");
1448 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1449 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1450 behavior instanceof Leader);
1452 // kill 2nd follower and leader should change to Isolated leader
1453 followerActor2.tell(PoisonPill.getInstance(), null);
1454 probe.watch(followerActor2);
1455 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1456 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1457 assertEquals(termMsg2.getActor(), followerActor2);
1459 leader.markFollowerInActive("follower-2");
1460 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1461 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1462 behavior instanceof IsolatedLeader);
1468 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1469 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1471 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1473 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1474 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1475 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1477 leaderActorContext.setConfigParams(configParams);
1479 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1481 followerActorContext.setConfigParams(configParams);
1482 followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1484 Follower follower = new Follower(followerActorContext);
1485 followerActor.underlyingActor().setBehavior(follower);
1487 leaderActorContext.getReplicatedLog().removeFrom(0);
1488 leaderActorContext.setCommitIndex(-1);
1489 leaderActorContext.setLastApplied(-1);
1491 followerActorContext.getReplicatedLog().removeFrom(0);
1492 followerActorContext.setCommitIndex(-1);
1493 followerActorContext.setLastApplied(-1);
1495 leader = new Leader(leaderActorContext);
1497 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1498 leaderActor, AppendEntriesReply.class);
1500 leader.handleMessage(followerActor, appendEntriesReply);
1502 // Clear initial heartbeat messages
1504 leaderActor.underlyingActor().clear();
1505 followerActor.underlyingActor().clear();
1508 leaderActorContext.setReplicatedLog(
1509 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1510 leaderActorContext.setCommitIndex(1);
1511 leaderActorContext.setLastApplied(1);
1513 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1514 TimeUnit.MILLISECONDS);
1516 leader.handleMessage(leaderActor, new SendHeartBeat());
1518 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1520 // Should send first log entry
1521 assertEquals(1, appendEntries.getLeaderCommit());
1522 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1523 assertEquals(-1, appendEntries.getPrevLogIndex());
1525 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1527 assertEquals(1, appendEntriesReply.getLogLastTerm());
1528 assertEquals(0, appendEntriesReply.getLogLastIndex());
1530 followerActor.underlyingActor().clear();
1532 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1534 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1536 // Should send second log entry
1537 assertEquals(1, appendEntries.getLeaderCommit());
1538 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1544 public void testLaggingFollowerStarvation() throws Exception {
1545 logStart("testLaggingFollowerStarvation");
1546 new JavaTestKit(getSystem()) {{
1547 String leaderActorId = actorFactory.generateActorId("leader");
1548 String follower1ActorId = actorFactory.generateActorId("follower");
1549 String follower2ActorId = actorFactory.generateActorId("follower");
1551 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1552 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1553 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1554 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1556 MockRaftActorContext leaderActorContext =
1557 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1559 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1560 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1561 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1563 leaderActorContext.setConfigParams(configParams);
1565 leaderActorContext.setReplicatedLog(
1566 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1568 Map<String, String> peerAddresses = new HashMap<>();
1569 peerAddresses.put(follower1ActorId,
1570 follower1Actor.path().toString());
1571 peerAddresses.put(follower2ActorId,
1572 follower2Actor.path().toString());
1574 leaderActorContext.setPeerAddresses(peerAddresses);
1575 leaderActorContext.getTermInformation().update(1, leaderActorId);
1577 RaftActorBehavior leader = createBehavior(leaderActorContext);
1579 leaderActor.underlyingActor().setBehavior(leader);
1581 for(int i=1;i<6;i++) {
1582 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1583 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1584 assertTrue(newBehavior == leader);
1585 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1588 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1589 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1591 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1592 heartbeats.size() > 1);
1594 // Check if follower-2 got AppendEntries during this time and was not starved
1595 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1597 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1598 appendEntries.size() > 1);
1604 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1605 ActorRef actorRef, RaftRPC rpc) throws Exception {
1606 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1607 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1610 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1612 private final long electionTimeOutIntervalMillis;
1613 private final int snapshotChunkSize;
1615 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1617 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1618 this.snapshotChunkSize = snapshotChunkSize;
1622 public FiniteDuration getElectionTimeOutInterval() {
1623 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1627 public int getSnapshotChunkSize() {
1628 return snapshotChunkSize;