1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50 static final String FOLLOWER_ID = "follower";
52 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58 private Leader leader;
62 public void tearDown() throws Exception {
71 public void testHandleMessageForUnknownMessage() throws Exception {
72 logStart("testHandleMessageForUnknownMessage");
74 leader = new Leader(createActorContext());
76 // handle message should return the Leader state when it receives an
78 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79 Assert.assertTrue(behavior instanceof Leader);
83 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86 MockRaftActorContext actorContext = createActorContextWithFollower();
89 actorContext.getTermInformation().update(term, "");
91 leader = new Leader(actorContext);
93 // Leader should send an immediate heartbeat with no entries as follower is inactive.
94 long lastIndex = actorContext.getReplicatedLog().lastIndex();
95 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96 assertEquals("getTerm", term, appendEntries.getTerm());
97 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99 assertEquals("Entries size", 0, appendEntries.getEntries().size());
101 // The follower would normally reply - simulate that explicitly here.
102 leader.handleMessage(followerActor, new AppendEntriesReply(
103 FOLLOWER_ID, term, true, lastIndex - 1, term));
104 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106 followerActor.underlyingActor().clear();
108 // Sleep for the heartbeat interval so AppendEntries is sent.
109 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112 leader.handleMessage(leaderActor, new SendHeartBeat());
114 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117 assertEquals("Entries size", 1, appendEntries.getEntries().size());
118 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
123 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
124 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
126 MockRaftActorContext actorContext = createActorContextWithFollower();
129 actorContext.getTermInformation().update(term, "");
131 leader = new Leader(actorContext);
133 // Leader will send an immediate heartbeat - ignore it.
134 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
136 // The follower would normally reply - simulate that explicitly here.
137 long lastIndex = actorContext.getReplicatedLog().lastIndex();
138 leader.handleMessage(followerActor, new AppendEntriesReply(
139 FOLLOWER_ID, term, true, lastIndex, term));
140 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142 followerActor.underlyingActor().clear();
144 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
145 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
146 1, lastIndex + 1, payload);
147 actorContext.getReplicatedLog().append(newEntry);
148 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
149 new Replicate(null, null, newEntry));
151 // State should not change
152 assertTrue(raftBehavior instanceof Leader);
154 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
155 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
156 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
157 assertEquals("Entries size", 1, appendEntries.getEntries().size());
158 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
159 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
160 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
164 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
165 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
167 MockRaftActorContext actorContext = createActorContext();
169 leader = new Leader(actorContext);
171 actorContext.setLastApplied(0);
173 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
174 long term = actorContext.getTermInformation().getCurrentTerm();
175 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
176 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
178 actorContext.getReplicatedLog().append(newEntry);
180 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
181 new Replicate(leaderActor, "state-id", newEntry));
183 // State should not change
184 assertTrue(raftBehavior instanceof Leader);
186 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
188 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
189 // one since lastApplied state is 0.
190 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
191 leaderActor, ApplyState.class);
192 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
194 for(int i = 0; i <= newLogIndex - 1; i++ ) {
195 ApplyState applyState = applyStateList.get(i);
196 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
197 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
200 ApplyState last = applyStateList.get((int) newLogIndex - 1);
201 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
202 assertEquals("getIdentifier", "state-id", last.getIdentifier());
206 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
207 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
209 MockRaftActorContext actorContext = createActorContextWithFollower();
211 Map<String, String> leadersSnapshot = new HashMap<>();
212 leadersSnapshot.put("1", "A");
213 leadersSnapshot.put("2", "B");
214 leadersSnapshot.put("3", "C");
217 actorContext.getReplicatedLog().removeFrom(0);
219 final int followersLastIndex = 2;
220 final int snapshotIndex = 3;
221 final int newEntryIndex = 4;
222 final int snapshotTerm = 1;
223 final int currentTerm = 2;
225 // set the snapshot variables in replicatedlog
226 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
227 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
228 actorContext.setCommitIndex(followersLastIndex);
229 //set follower timeout to 2 mins, helps during debugging
230 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
232 leader = new Leader(actorContext);
235 ReplicatedLogImplEntry entry =
236 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
237 new MockRaftActorContext.MockPayload("D"));
239 //update follower timestamp
240 leader.markFollowerActive(FOLLOWER_ID);
242 ByteString bs = toByteString(leadersSnapshot);
243 leader.setSnapshot(Optional.of(bs));
244 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
245 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
247 //send first chunk and no InstallSnapshotReply received yet
249 fts.incrementChunkIndex();
251 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
252 TimeUnit.MILLISECONDS);
254 leader.handleMessage(leaderActor, new SendHeartBeat());
256 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
258 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
260 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
262 //InstallSnapshotReply received
263 fts.markSendStatus(true);
265 leader.handleMessage(leaderActor, new SendHeartBeat());
267 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
268 InstallSnapshot.SERIALIZABLE_CLASS);
270 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
272 assertEquals(snapshotIndex, is.getLastIncludedIndex());
276 public void testSendAppendEntriesSnapshotScenario() throws Exception {
277 logStart("testSendAppendEntriesSnapshotScenario");
279 MockRaftActorContext actorContext = createActorContextWithFollower();
281 Map<String, String> leadersSnapshot = new HashMap<>();
282 leadersSnapshot.put("1", "A");
283 leadersSnapshot.put("2", "B");
284 leadersSnapshot.put("3", "C");
287 actorContext.getReplicatedLog().removeFrom(0);
289 final int followersLastIndex = 2;
290 final int snapshotIndex = 3;
291 final int newEntryIndex = 4;
292 final int snapshotTerm = 1;
293 final int currentTerm = 2;
295 // set the snapshot variables in replicatedlog
296 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
297 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
298 actorContext.setCommitIndex(followersLastIndex);
300 leader = new Leader(actorContext);
302 // Leader will send an immediate heartbeat - ignore it.
303 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
306 ReplicatedLogImplEntry entry =
307 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
308 new MockRaftActorContext.MockPayload("D"));
310 //update follower timestamp
311 leader.markFollowerActive(FOLLOWER_ID);
313 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
314 RaftActorBehavior raftBehavior = leader.handleMessage(
315 leaderActor, new Replicate(null, "state-id", entry));
317 assertTrue(raftBehavior instanceof Leader);
319 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
323 public void testInitiateInstallSnapshot() throws Exception {
324 logStart("testInitiateInstallSnapshot");
326 MockRaftActorContext actorContext = createActorContextWithFollower();
328 Map<String, String> leadersSnapshot = new HashMap<>();
329 leadersSnapshot.put("1", "A");
330 leadersSnapshot.put("2", "B");
331 leadersSnapshot.put("3", "C");
334 actorContext.getReplicatedLog().removeFrom(0);
336 final int followersLastIndex = 2;
337 final int snapshotIndex = 3;
338 final int newEntryIndex = 4;
339 final int snapshotTerm = 1;
340 final int currentTerm = 2;
342 // set the snapshot variables in replicatedlog
343 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
344 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
345 actorContext.setLastApplied(3);
346 actorContext.setCommitIndex(followersLastIndex);
348 leader = new Leader(actorContext);
350 // Leader will send an immediate heartbeat - ignore it.
351 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
353 // set the snapshot as absent and check if capture-snapshot is invoked.
354 leader.setSnapshot(Optional.<ByteString>absent());
357 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
358 new MockRaftActorContext.MockPayload("D"));
360 actorContext.getReplicatedLog().append(entry);
362 //update follower timestamp
363 leader.markFollowerActive(FOLLOWER_ID);
365 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
367 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
369 assertTrue(cs.isInstallSnapshotInitiated());
370 assertEquals(3, cs.getLastAppliedIndex());
371 assertEquals(1, cs.getLastAppliedTerm());
372 assertEquals(4, cs.getLastIndex());
373 assertEquals(2, cs.getLastTerm());
375 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
376 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
378 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
379 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
383 public void testInstallSnapshot() throws Exception {
384 logStart("testInstallSnapshot");
386 MockRaftActorContext actorContext = createActorContextWithFollower();
388 Map<String, String> leadersSnapshot = new HashMap<>();
389 leadersSnapshot.put("1", "A");
390 leadersSnapshot.put("2", "B");
391 leadersSnapshot.put("3", "C");
394 actorContext.getReplicatedLog().removeFrom(0);
396 final int followersLastIndex = 2;
397 final int snapshotIndex = 3;
398 final int snapshotTerm = 1;
399 final int currentTerm = 2;
401 // set the snapshot variables in replicatedlog
402 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
403 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
404 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
405 actorContext.setCommitIndex(followersLastIndex);
407 leader = new Leader(actorContext);
409 // Ignore initial heartbeat.
410 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
412 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
413 new SendInstallSnapshot(toByteString(leadersSnapshot)));
415 assertTrue(raftBehavior instanceof Leader);
417 // check if installsnapshot gets called with the correct values.
419 InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
420 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
422 assertNotNull(installSnapshot.getData());
423 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
424 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
426 assertEquals(currentTerm, installSnapshot.getTerm());
430 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
431 logStart("testHandleInstallSnapshotReplyLastChunk");
433 MockRaftActorContext actorContext = createActorContextWithFollower();
435 final int followersLastIndex = 2;
436 final int snapshotIndex = 3;
437 final int snapshotTerm = 1;
438 final int currentTerm = 2;
440 actorContext.setCommitIndex(followersLastIndex);
442 leader = new Leader(actorContext);
444 // Ignore initial heartbeat.
445 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
447 Map<String, String> leadersSnapshot = new HashMap<>();
448 leadersSnapshot.put("1", "A");
449 leadersSnapshot.put("2", "B");
450 leadersSnapshot.put("3", "C");
452 // set the snapshot variables in replicatedlog
454 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
455 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
456 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
458 ByteString bs = toByteString(leadersSnapshot);
459 leader.setSnapshot(Optional.of(bs));
460 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
461 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
462 while(!fts.isLastChunk(fts.getChunkIndex())) {
464 fts.incrementChunkIndex();
468 actorContext.getReplicatedLog().removeFrom(0);
470 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
471 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
473 assertTrue(raftBehavior instanceof Leader);
475 assertEquals(0, leader.followerSnapshotSize());
476 assertEquals(1, leader.followerLogSize());
477 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
479 assertEquals(snapshotIndex, fli.getMatchIndex());
480 assertEquals(snapshotIndex, fli.getMatchIndex());
481 assertEquals(snapshotIndex + 1, fli.getNextIndex());
485 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
486 logStart("testSendSnapshotfromInstallSnapshotReply");
488 MockRaftActorContext actorContext = createActorContextWithFollower();
490 final int followersLastIndex = 2;
491 final int snapshotIndex = 3;
492 final int snapshotTerm = 1;
493 final int currentTerm = 2;
495 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
497 public int getSnapshotChunkSize() {
501 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
502 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
504 actorContext.setConfigParams(configParams);
505 actorContext.setCommitIndex(followersLastIndex);
507 leader = new Leader(actorContext);
509 Map<String, String> leadersSnapshot = new HashMap<>();
510 leadersSnapshot.put("1", "A");
511 leadersSnapshot.put("2", "B");
512 leadersSnapshot.put("3", "C");
514 // set the snapshot variables in replicatedlog
515 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
516 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
517 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
519 ByteString bs = toByteString(leadersSnapshot);
520 leader.setSnapshot(Optional.of(bs));
522 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
524 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
525 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
527 assertEquals(1, installSnapshot.getChunkIndex());
528 assertEquals(3, installSnapshot.getTotalChunks());
530 followerActor.underlyingActor().clear();
531 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
532 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
534 installSnapshot = MessageCollectorActor.expectFirstMatching(
535 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
537 assertEquals(2, installSnapshot.getChunkIndex());
538 assertEquals(3, installSnapshot.getTotalChunks());
540 followerActor.underlyingActor().clear();
541 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
542 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
544 installSnapshot = MessageCollectorActor.expectFirstMatching(
545 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
547 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
548 followerActor.underlyingActor().clear();
549 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
550 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
552 installSnapshot = MessageCollectorActor.getFirstMatching(
553 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
555 Assert.assertNull(installSnapshot);
560 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
561 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
563 MockRaftActorContext actorContext = createActorContextWithFollower();
565 final int followersLastIndex = 2;
566 final int snapshotIndex = 3;
567 final int snapshotTerm = 1;
568 final int currentTerm = 2;
570 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
572 public int getSnapshotChunkSize() {
577 actorContext.setCommitIndex(followersLastIndex);
579 leader = new Leader(actorContext);
581 Map<String, String> leadersSnapshot = new HashMap<>();
582 leadersSnapshot.put("1", "A");
583 leadersSnapshot.put("2", "B");
584 leadersSnapshot.put("3", "C");
586 // set the snapshot variables in replicatedlog
587 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
588 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
589 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
591 ByteString bs = toByteString(leadersSnapshot);
592 leader.setSnapshot(Optional.of(bs));
594 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
596 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
597 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
599 assertEquals(1, installSnapshot.getChunkIndex());
600 assertEquals(3, installSnapshot.getTotalChunks());
602 followerActor.underlyingActor().clear();
604 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
605 FOLLOWER_ID, -1, false));
607 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608 TimeUnit.MILLISECONDS);
610 leader.handleMessage(leaderActor, new SendHeartBeat());
612 installSnapshot = MessageCollectorActor.expectFirstMatching(
613 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
615 assertEquals(1, installSnapshot.getChunkIndex());
616 assertEquals(3, installSnapshot.getTotalChunks());
620 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
621 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
623 MockRaftActorContext actorContext = createActorContextWithFollower();
625 final int followersLastIndex = 2;
626 final int snapshotIndex = 3;
627 final int snapshotTerm = 1;
628 final int currentTerm = 2;
630 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
632 public int getSnapshotChunkSize() {
637 actorContext.setCommitIndex(followersLastIndex);
639 leader = new Leader(actorContext);
641 Map<String, String> leadersSnapshot = new HashMap<>();
642 leadersSnapshot.put("1", "A");
643 leadersSnapshot.put("2", "B");
644 leadersSnapshot.put("3", "C");
646 // set the snapshot variables in replicatedlog
647 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
651 ByteString bs = toByteString(leadersSnapshot);
652 leader.setSnapshot(Optional.of(bs));
654 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
656 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
657 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
659 assertEquals(1, installSnapshot.getChunkIndex());
660 assertEquals(3, installSnapshot.getTotalChunks());
661 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
663 int hashCode = installSnapshot.getData().hashCode();
665 followerActor.underlyingActor().clear();
667 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
668 FOLLOWER_ID, 1, true));
670 installSnapshot = MessageCollectorActor.expectFirstMatching(
671 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
673 assertEquals(2, installSnapshot.getChunkIndex());
674 assertEquals(3, installSnapshot.getTotalChunks());
675 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
679 public void testFollowerToSnapshotLogic() {
680 logStart("testFollowerToSnapshotLogic");
682 MockRaftActorContext actorContext = createActorContext();
684 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
686 public int getSnapshotChunkSize() {
691 leader = new Leader(actorContext);
693 Map<String, String> leadersSnapshot = new HashMap<>();
694 leadersSnapshot.put("1", "A");
695 leadersSnapshot.put("2", "B");
696 leadersSnapshot.put("3", "C");
698 ByteString bs = toByteString(leadersSnapshot);
699 byte[] barray = bs.toByteArray();
701 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
702 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
704 assertEquals(bs.size(), barray.length);
707 for (int i=0; i < barray.length; i = i + 50) {
711 if (i + 50 > barray.length) {
715 ByteString chunk = fts.getNextChunk();
716 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
717 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
719 fts.markSendStatus(true);
720 if (!fts.isLastChunk(chunkIndex)) {
721 fts.incrementChunkIndex();
725 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
728 @Override protected RaftActorBehavior createBehavior(
729 RaftActorContext actorContext) {
730 return new Leader(actorContext);
734 protected MockRaftActorContext createActorContext() {
735 return createActorContext(leaderActor);
739 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
740 return createActorContext("leader", actorRef);
743 private MockRaftActorContext createActorContextWithFollower() {
744 MockRaftActorContext actorContext = createActorContext();
745 actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
746 followerActor.path().toString()).build());
750 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
751 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
753 configParams.setElectionTimeoutFactor(100000);
754 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
755 context.setConfigParams(configParams);
759 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
760 AbstractRaftActorBehavior behavior;
762 @Override public void onReceive(Object message) throws Exception {
763 if(behavior != null) {
764 behavior.handleMessage(sender(), message);
767 super.onReceive(message);
770 public static Props props() {
771 return Props.create(ForwardMessageToBehaviorActor.class);
776 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
777 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
779 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
781 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
783 Follower follower = new Follower(followerActorContext);
784 followerActor.underlyingActor().behavior = follower;
786 Map<String, String> peerAddresses = new HashMap<>();
787 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
789 leaderActorContext.setPeerAddresses(peerAddresses);
791 leaderActorContext.getReplicatedLog().removeFrom(0);
794 leaderActorContext.setReplicatedLog(
795 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
797 leaderActorContext.setCommitIndex(1);
799 followerActorContext.getReplicatedLog().removeFrom(0);
801 // follower too has the exact same log entries and has the same commit index
802 followerActorContext.setReplicatedLog(
803 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
805 followerActorContext.setCommitIndex(1);
807 leader = new Leader(leaderActorContext);
809 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811 assertEquals(1, appendEntries.getLeaderCommit());
812 assertEquals(0, appendEntries.getEntries().size());
813 assertEquals(0, appendEntries.getPrevLogIndex());
815 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
816 leaderActor, AppendEntriesReply.class);
818 assertEquals(2, appendEntriesReply.getLogLastIndex());
819 assertEquals(1, appendEntriesReply.getLogLastTerm());
821 // follower returns its next index
822 assertEquals(2, appendEntriesReply.getLogLastIndex());
823 assertEquals(1, appendEntriesReply.getLogLastTerm());
829 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
830 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
832 MockRaftActorContext leaderActorContext = createActorContext();
834 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
836 Follower follower = new Follower(followerActorContext);
837 followerActor.underlyingActor().behavior = follower;
839 Map<String, String> peerAddresses = new HashMap<>();
840 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
842 leaderActorContext.setPeerAddresses(peerAddresses);
844 leaderActorContext.getReplicatedLog().removeFrom(0);
846 leaderActorContext.setReplicatedLog(
847 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
849 leaderActorContext.setCommitIndex(1);
851 followerActorContext.getReplicatedLog().removeFrom(0);
853 followerActorContext.setReplicatedLog(
854 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
856 // follower has the same log entries but its commit index > leaders commit index
857 followerActorContext.setCommitIndex(2);
859 leader = new Leader(leaderActorContext);
862 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864 assertEquals(1, appendEntries.getLeaderCommit());
865 assertEquals(0, appendEntries.getEntries().size());
866 assertEquals(0, appendEntries.getPrevLogIndex());
868 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
869 leaderActor, AppendEntriesReply.class);
871 assertEquals(2, appendEntriesReply.getLogLastIndex());
872 assertEquals(1, appendEntriesReply.getLogLastTerm());
874 leaderActor.underlyingActor().behavior = leader;
875 leader.handleMessage(followerActor, appendEntriesReply);
877 leaderActor.underlyingActor().clear();
878 followerActor.underlyingActor().clear();
880 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
881 TimeUnit.MILLISECONDS);
883 leader.handleMessage(leaderActor, new SendHeartBeat());
885 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
887 assertEquals(2, appendEntries.getLeaderCommit());
888 assertEquals(0, appendEntries.getEntries().size());
889 assertEquals(2, appendEntries.getPrevLogIndex());
891 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
893 assertEquals(2, appendEntriesReply.getLogLastIndex());
894 assertEquals(1, appendEntriesReply.getLogLastTerm());
896 assertEquals(2, followerActorContext.getCommitIndex());
902 public void testHandleAppendEntriesReplyFailure(){
903 logStart("testHandleAppendEntriesReplyFailure");
905 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
907 leader = new Leader(leaderActorContext);
909 // Send initial heartbeat reply with last index.
910 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
912 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
913 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
915 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
917 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
919 assertEquals(RaftState.Leader, raftActorBehavior.state());
921 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
925 public void testHandleAppendEntriesReplySuccess() throws Exception {
926 logStart("testHandleAppendEntriesReplySuccess");
928 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
930 leaderActorContext.setReplicatedLog(
931 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
933 leaderActorContext.setCommitIndex(1);
934 leaderActorContext.setLastApplied(1);
935 leaderActorContext.getTermInformation().update(1, "leader");
937 leader = new Leader(leaderActorContext);
939 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
941 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
943 assertEquals(RaftState.Leader, raftActorBehavior.state());
945 assertEquals(2, leaderActorContext.getCommitIndex());
947 ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
948 leaderActor, ApplyLogEntries.class);
950 assertEquals(2, leaderActorContext.getLastApplied());
952 assertEquals(2, applyLogEntries.getToIndex());
954 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
957 assertEquals(1,applyStateList.size());
959 ApplyState applyState = applyStateList.get(0);
961 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
965 public void testHandleAppendEntriesReplyUnknownFollower(){
966 logStart("testHandleAppendEntriesReplyUnknownFollower");
968 MockRaftActorContext leaderActorContext = createActorContext();
970 leader = new Leader(leaderActorContext);
972 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
974 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
976 assertEquals(RaftState.Leader, raftActorBehavior.state());
980 public void testHandleRequestVoteReply(){
981 logStart("testHandleRequestVoteReply");
983 MockRaftActorContext leaderActorContext = createActorContext();
985 leader = new Leader(leaderActorContext);
987 // Should be a no-op.
988 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
989 new RequestVoteReply(1, true));
991 assertEquals(RaftState.Leader, raftActorBehavior.state());
993 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
995 assertEquals(RaftState.Leader, raftActorBehavior.state());
999 public void testIsolatedLeaderCheckNoFollowers() {
1000 logStart("testIsolatedLeaderCheckNoFollowers");
1002 MockRaftActorContext leaderActorContext = createActorContext();
1004 leader = new Leader(leaderActorContext);
1005 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1006 Assert.assertTrue(behavior instanceof Leader);
1010 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1011 logStart("testIsolatedLeaderCheckTwoFollowers");
1013 new JavaTestKit(getSystem()) {{
1015 ActorRef followerActor1 = getTestActor();
1016 ActorRef followerActor2 = getTestActor();
1018 MockRaftActorContext leaderActorContext = createActorContext();
1020 Map<String, String> peerAddresses = new HashMap<>();
1021 peerAddresses.put("follower-1", followerActor1.path().toString());
1022 peerAddresses.put("follower-2", followerActor2.path().toString());
1024 leaderActorContext.setPeerAddresses(peerAddresses);
1026 leader = new Leader(leaderActorContext);
1028 leader.markFollowerActive("follower-1");
1029 leader.markFollowerActive("follower-2");
1030 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1031 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1032 behavior instanceof Leader);
1034 // kill 1 follower and verify if that got killed
1035 final JavaTestKit probe = new JavaTestKit(getSystem());
1036 probe.watch(followerActor1);
1037 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1038 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1039 assertEquals(termMsg1.getActor(), followerActor1);
1041 leader.markFollowerInActive("follower-1");
1042 leader.markFollowerActive("follower-2");
1043 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1044 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1045 behavior instanceof Leader);
1047 // kill 2nd follower and leader should change to Isolated leader
1048 followerActor2.tell(PoisonPill.getInstance(), null);
1049 probe.watch(followerActor2);
1050 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1051 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1052 assertEquals(termMsg2.getActor(), followerActor2);
1054 leader.markFollowerInActive("follower-2");
1055 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1056 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1057 behavior instanceof IsolatedLeader);
1063 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1064 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1066 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1068 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1069 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1070 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1072 leaderActorContext.setConfigParams(configParams);
1074 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1076 followerActorContext.setConfigParams(configParams);
1078 Follower follower = new Follower(followerActorContext);
1079 followerActor.underlyingActor().behavior = follower;
1081 leaderActorContext.getReplicatedLog().removeFrom(0);
1082 leaderActorContext.setCommitIndex(-1);
1083 leaderActorContext.setLastApplied(-1);
1085 followerActorContext.getReplicatedLog().removeFrom(0);
1086 followerActorContext.setCommitIndex(-1);
1087 followerActorContext.setLastApplied(-1);
1089 leader = new Leader(leaderActorContext);
1091 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1092 leaderActor, AppendEntriesReply.class);
1094 leader.handleMessage(followerActor, appendEntriesReply);
1096 // Clear initial heartbeat messages
1098 leaderActor.underlyingActor().clear();
1099 followerActor.underlyingActor().clear();
1102 leaderActorContext.setReplicatedLog(
1103 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1104 leaderActorContext.setCommitIndex(1);
1105 leaderActorContext.setLastApplied(1);
1107 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1108 TimeUnit.MILLISECONDS);
1110 leader.handleMessage(leaderActor, new SendHeartBeat());
1112 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1114 // Should send first log entry
1115 assertEquals(1, appendEntries.getLeaderCommit());
1116 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1117 assertEquals(-1, appendEntries.getPrevLogIndex());
1119 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1121 assertEquals(1, appendEntriesReply.getLogLastTerm());
1122 assertEquals(0, appendEntriesReply.getLogLastIndex());
1124 followerActor.underlyingActor().clear();
1126 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1128 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1130 // Should send second log entry
1131 assertEquals(1, appendEntries.getLeaderCommit());
1132 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1138 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1139 ActorRef actorRef, RaftRPC rpc) throws Exception {
1140 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1141 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1144 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1146 private final long electionTimeOutIntervalMillis;
1147 private final int snapshotChunkSize;
1149 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1151 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1152 this.snapshotChunkSize = snapshotChunkSize;
1156 public FiniteDuration getElectionTimeOutInterval() {
1157 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1161 public int getSnapshotChunkSize() {
1162 return snapshotChunkSize;