1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.util.HashMap;
20 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.After;
24 import org.junit.Assert;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
27 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
29 import org.opendaylight.controller.cluster.raft.RaftActorContext;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
32 import org.opendaylight.controller.cluster.raft.SerializationUtils;
33 import org.opendaylight.controller.cluster.raft.TestActorFactory;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
36 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
38 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
39 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
40 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
49 import org.slf4j.LoggerFactory;
50 import scala.concurrent.duration.FiniteDuration;
52 public class LeaderTest extends AbstractRaftActorBehaviorTest {
54 static final String FOLLOWER_ID = "follower";
56 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
58 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
59 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
61 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
62 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
64 private Leader leader;
67 public void tearDown() throws Exception {
75 private void logStart(String name) {
76 LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
80 public void testHandleMessageForUnknownMessage() throws Exception {
81 logStart("testHandleMessageForUnknownMessage");
83 leader = new Leader(createActorContext());
85 // handle message should return the Leader state when it receives an
87 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
88 Assert.assertTrue(behavior instanceof Leader);
92 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
93 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
95 MockRaftActorContext actorContext = createActorContextWithFollower();
98 actorContext.getTermInformation().update(term, "");
100 leader = new Leader(actorContext);
102 // Leader should send an immediate heartbeat with no entries as follower is inactive.
103 long lastIndex = actorContext.getReplicatedLog().lastIndex();
104 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
105 assertEquals("getTerm", term, appendEntries.getTerm());
106 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
107 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
108 assertEquals("Entries size", 0, appendEntries.getEntries().size());
110 // The follower would normally reply - simulate that explicitly here.
111 leader.handleMessage(followerActor, new AppendEntriesReply(
112 FOLLOWER_ID, term, true, lastIndex - 1, term));
113 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
115 followerActor.underlyingActor().clear();
117 // Sleep for the heartbeat interval so AppendEntries is sent.
118 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
119 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
121 leader.handleMessage(leaderActor, new SendHeartBeat());
123 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
124 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
125 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
126 assertEquals("Entries size", 1, appendEntries.getEntries().size());
127 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
128 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
132 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
133 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135 MockRaftActorContext actorContext = createActorContextWithFollower();
138 actorContext.getTermInformation().update(term, "");
140 leader = new Leader(actorContext);
142 // Leader will send an immediate heartbeat - ignore it.
143 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145 // The follower would normally reply - simulate that explicitly here.
146 long lastIndex = actorContext.getReplicatedLog().lastIndex();
147 leader.handleMessage(followerActor, new AppendEntriesReply(
148 FOLLOWER_ID, term, true, lastIndex, term));
149 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151 followerActor.underlyingActor().clear();
153 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155 1, lastIndex + 1, payload);
156 actorContext.getReplicatedLog().append(newEntry);
157 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
158 new Replicate(null, null, newEntry));
160 // State should not change
161 assertTrue(raftBehavior instanceof Leader);
163 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166 assertEquals("Entries size", 1, appendEntries.getEntries().size());
167 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
173 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
174 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
176 MockRaftActorContext actorContext = createActorContext();
178 leader = new Leader(actorContext);
180 actorContext.setLastApplied(0);
182 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
183 long term = actorContext.getTermInformation().getCurrentTerm();
184 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
185 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
187 actorContext.getReplicatedLog().append(newEntry);
189 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
190 new Replicate(leaderActor, "state-id", newEntry));
192 // State should not change
193 assertTrue(raftBehavior instanceof Leader);
195 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
197 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
198 // one since lastApplied state is 0.
199 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
200 leaderActor, ApplyState.class);
201 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
203 for(int i = 0; i <= newLogIndex - 1; i++ ) {
204 ApplyState applyState = applyStateList.get(i);
205 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
206 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
209 ApplyState last = applyStateList.get((int) newLogIndex - 1);
210 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
211 assertEquals("getIdentifier", "state-id", last.getIdentifier());
215 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
216 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
218 MockRaftActorContext actorContext = createActorContextWithFollower();
220 Map<String, String> leadersSnapshot = new HashMap<>();
221 leadersSnapshot.put("1", "A");
222 leadersSnapshot.put("2", "B");
223 leadersSnapshot.put("3", "C");
226 actorContext.getReplicatedLog().removeFrom(0);
228 final int followersLastIndex = 2;
229 final int snapshotIndex = 3;
230 final int newEntryIndex = 4;
231 final int snapshotTerm = 1;
232 final int currentTerm = 2;
234 // set the snapshot variables in replicatedlog
235 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
236 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
237 actorContext.setCommitIndex(followersLastIndex);
238 //set follower timeout to 2 mins, helps during debugging
239 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
241 leader = new Leader(actorContext);
244 ReplicatedLogImplEntry entry =
245 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
246 new MockRaftActorContext.MockPayload("D"));
248 //update follower timestamp
249 leader.markFollowerActive(FOLLOWER_ID);
251 ByteString bs = toByteString(leadersSnapshot);
252 leader.setSnapshot(Optional.of(bs));
253 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
254 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
256 //send first chunk and no InstallSnapshotReply received yet
258 fts.incrementChunkIndex();
260 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
261 TimeUnit.MILLISECONDS);
263 leader.handleMessage(leaderActor, new SendHeartBeat());
265 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
269 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
271 //InstallSnapshotReply received
272 fts.markSendStatus(true);
274 leader.handleMessage(leaderActor, new SendHeartBeat());
276 InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
277 InstallSnapshot.SERIALIZABLE_CLASS);
279 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
281 assertEquals(snapshotIndex, is.getLastIncludedIndex());
285 public void testSendAppendEntriesSnapshotScenario() throws Exception {
286 logStart("testSendAppendEntriesSnapshotScenario");
288 MockRaftActorContext actorContext = createActorContextWithFollower();
290 Map<String, String> leadersSnapshot = new HashMap<>();
291 leadersSnapshot.put("1", "A");
292 leadersSnapshot.put("2", "B");
293 leadersSnapshot.put("3", "C");
296 actorContext.getReplicatedLog().removeFrom(0);
298 final int followersLastIndex = 2;
299 final int snapshotIndex = 3;
300 final int newEntryIndex = 4;
301 final int snapshotTerm = 1;
302 final int currentTerm = 2;
304 // set the snapshot variables in replicatedlog
305 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
306 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
307 actorContext.setCommitIndex(followersLastIndex);
309 leader = new Leader(actorContext);
311 // Leader will send an immediate heartbeat - ignore it.
312 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
315 ReplicatedLogImplEntry entry =
316 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
317 new MockRaftActorContext.MockPayload("D"));
319 //update follower timestamp
320 leader.markFollowerActive(FOLLOWER_ID);
322 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
323 RaftActorBehavior raftBehavior = leader.handleMessage(
324 leaderActor, new Replicate(null, "state-id", entry));
326 assertTrue(raftBehavior instanceof Leader);
328 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
332 public void testInitiateInstallSnapshot() throws Exception {
333 logStart("testInitiateInstallSnapshot");
335 MockRaftActorContext actorContext = createActorContextWithFollower();
337 Map<String, String> leadersSnapshot = new HashMap<>();
338 leadersSnapshot.put("1", "A");
339 leadersSnapshot.put("2", "B");
340 leadersSnapshot.put("3", "C");
343 actorContext.getReplicatedLog().removeFrom(0);
345 final int followersLastIndex = 2;
346 final int snapshotIndex = 3;
347 final int newEntryIndex = 4;
348 final int snapshotTerm = 1;
349 final int currentTerm = 2;
351 // set the snapshot variables in replicatedlog
352 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
353 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
354 actorContext.setLastApplied(3);
355 actorContext.setCommitIndex(followersLastIndex);
357 leader = new Leader(actorContext);
359 // Leader will send an immediate heartbeat - ignore it.
360 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
362 // set the snapshot as absent and check if capture-snapshot is invoked.
363 leader.setSnapshot(Optional.<ByteString>absent());
366 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
367 new MockRaftActorContext.MockPayload("D"));
369 actorContext.getReplicatedLog().append(entry);
371 //update follower timestamp
372 leader.markFollowerActive(FOLLOWER_ID);
374 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
376 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
378 assertTrue(cs.isInstallSnapshotInitiated());
379 assertEquals(3, cs.getLastAppliedIndex());
380 assertEquals(1, cs.getLastAppliedTerm());
381 assertEquals(4, cs.getLastIndex());
382 assertEquals(2, cs.getLastTerm());
384 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
385 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
387 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
388 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
392 public void testInstallSnapshot() throws Exception {
393 logStart("testInstallSnapshot");
395 MockRaftActorContext actorContext = createActorContextWithFollower();
397 Map<String, String> leadersSnapshot = new HashMap<>();
398 leadersSnapshot.put("1", "A");
399 leadersSnapshot.put("2", "B");
400 leadersSnapshot.put("3", "C");
403 actorContext.getReplicatedLog().removeFrom(0);
405 final int followersLastIndex = 2;
406 final int snapshotIndex = 3;
407 final int snapshotTerm = 1;
408 final int currentTerm = 2;
410 // set the snapshot variables in replicatedlog
411 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
414 actorContext.setCommitIndex(followersLastIndex);
416 leader = new Leader(actorContext);
418 // Ignore initial heartbeat.
419 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
421 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
422 new SendInstallSnapshot(toByteString(leadersSnapshot)));
424 assertTrue(raftBehavior instanceof Leader);
426 // check if installsnapshot gets called with the correct values.
428 InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
429 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
431 assertNotNull(installSnapshot.getData());
432 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
433 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
435 // FIXME - we don't set the term in the serialized message.
436 //assertEquals(currentTerm, installSnapshot.getTerm());
440 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
441 logStart("testHandleInstallSnapshotReplyLastChunk");
443 MockRaftActorContext actorContext = createActorContextWithFollower();
445 final int followersLastIndex = 2;
446 final int snapshotIndex = 3;
447 final int snapshotTerm = 1;
448 final int currentTerm = 2;
450 actorContext.setCommitIndex(followersLastIndex);
452 leader = new Leader(actorContext);
454 // Ignore initial heartbeat.
455 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
457 Map<String, String> leadersSnapshot = new HashMap<>();
458 leadersSnapshot.put("1", "A");
459 leadersSnapshot.put("2", "B");
460 leadersSnapshot.put("3", "C");
462 // set the snapshot variables in replicatedlog
464 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
465 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
466 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
468 ByteString bs = toByteString(leadersSnapshot);
469 leader.setSnapshot(Optional.of(bs));
470 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
471 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
472 while(!fts.isLastChunk(fts.getChunkIndex())) {
474 fts.incrementChunkIndex();
478 actorContext.getReplicatedLog().removeFrom(0);
480 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
481 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
483 assertTrue(raftBehavior instanceof Leader);
485 assertEquals(0, leader.followerSnapshotSize());
486 assertEquals(1, leader.followerLogSize());
487 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
489 assertEquals(snapshotIndex, fli.getMatchIndex());
490 assertEquals(snapshotIndex, fli.getMatchIndex());
491 assertEquals(snapshotIndex + 1, fli.getNextIndex());
495 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
496 logStart("testSendSnapshotfromInstallSnapshotReply");
498 MockRaftActorContext actorContext = createActorContextWithFollower();
500 final int followersLastIndex = 2;
501 final int snapshotIndex = 3;
502 final int snapshotTerm = 1;
503 final int currentTerm = 2;
505 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
507 public int getSnapshotChunkSize() {
511 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
512 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
514 actorContext.setConfigParams(configParams);
515 actorContext.setCommitIndex(followersLastIndex);
517 leader = new Leader(actorContext);
519 Map<String, String> leadersSnapshot = new HashMap<>();
520 leadersSnapshot.put("1", "A");
521 leadersSnapshot.put("2", "B");
522 leadersSnapshot.put("3", "C");
524 // set the snapshot variables in replicatedlog
525 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
526 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
527 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
529 ByteString bs = toByteString(leadersSnapshot);
530 leader.setSnapshot(Optional.of(bs));
532 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
534 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
535 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
537 assertEquals(1, installSnapshot.getChunkIndex());
538 assertEquals(3, installSnapshot.getTotalChunks());
540 followerActor.underlyingActor().clear();
541 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
542 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
544 installSnapshot = MessageCollectorActor.expectFirstMatching(
545 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
547 assertEquals(2, installSnapshot.getChunkIndex());
548 assertEquals(3, installSnapshot.getTotalChunks());
550 followerActor.underlyingActor().clear();
551 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
552 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
554 installSnapshot = MessageCollectorActor.expectFirstMatching(
555 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
557 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
558 followerActor.underlyingActor().clear();
559 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
560 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
562 installSnapshot = MessageCollectorActor.getFirstMatching(
563 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
565 Assert.assertNull(installSnapshot);
570 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
571 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
573 MockRaftActorContext actorContext = createActorContextWithFollower();
575 final int followersLastIndex = 2;
576 final int snapshotIndex = 3;
577 final int snapshotTerm = 1;
578 final int currentTerm = 2;
580 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
582 public int getSnapshotChunkSize() {
587 actorContext.setCommitIndex(followersLastIndex);
589 leader = new Leader(actorContext);
591 Map<String, String> leadersSnapshot = new HashMap<>();
592 leadersSnapshot.put("1", "A");
593 leadersSnapshot.put("2", "B");
594 leadersSnapshot.put("3", "C");
596 // set the snapshot variables in replicatedlog
597 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
598 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
599 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
601 ByteString bs = toByteString(leadersSnapshot);
602 leader.setSnapshot(Optional.of(bs));
604 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
606 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
607 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
609 assertEquals(1, installSnapshot.getChunkIndex());
610 assertEquals(3, installSnapshot.getTotalChunks());
612 followerActor.underlyingActor().clear();
614 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
615 FOLLOWER_ID, -1, false));
617 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
618 TimeUnit.MILLISECONDS);
620 leader.handleMessage(leaderActor, new SendHeartBeat());
622 installSnapshot = MessageCollectorActor.expectFirstMatching(
623 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
625 assertEquals(1, installSnapshot.getChunkIndex());
626 assertEquals(3, installSnapshot.getTotalChunks());
630 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
631 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
633 MockRaftActorContext actorContext = createActorContextWithFollower();
635 final int followersLastIndex = 2;
636 final int snapshotIndex = 3;
637 final int snapshotTerm = 1;
638 final int currentTerm = 2;
640 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
642 public int getSnapshotChunkSize() {
647 actorContext.setCommitIndex(followersLastIndex);
649 leader = new Leader(actorContext);
651 Map<String, String> leadersSnapshot = new HashMap<>();
652 leadersSnapshot.put("1", "A");
653 leadersSnapshot.put("2", "B");
654 leadersSnapshot.put("3", "C");
656 // set the snapshot variables in replicatedlog
657 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
658 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
659 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
661 ByteString bs = toByteString(leadersSnapshot);
662 leader.setSnapshot(Optional.of(bs));
664 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
666 InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
667 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
669 assertEquals(1, installSnapshot.getChunkIndex());
670 assertEquals(3, installSnapshot.getTotalChunks());
671 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
673 int hashCode = installSnapshot.getData().hashCode();
675 followerActor.underlyingActor().clear();
677 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
678 FOLLOWER_ID, 1, true));
680 installSnapshot = MessageCollectorActor.expectFirstMatching(
681 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
683 assertEquals(2, installSnapshot.getChunkIndex());
684 assertEquals(3, installSnapshot.getTotalChunks());
685 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
689 public void testFollowerToSnapshotLogic() {
690 logStart("testFollowerToSnapshotLogic");
692 MockRaftActorContext actorContext = createActorContext();
694 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
696 public int getSnapshotChunkSize() {
701 leader = new Leader(actorContext);
703 Map<String, String> leadersSnapshot = new HashMap<>();
704 leadersSnapshot.put("1", "A");
705 leadersSnapshot.put("2", "B");
706 leadersSnapshot.put("3", "C");
708 ByteString bs = toByteString(leadersSnapshot);
709 byte[] barray = bs.toByteArray();
711 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
712 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
714 assertEquals(bs.size(), barray.length);
717 for (int i=0; i < barray.length; i = i + 50) {
721 if (i + 50 > barray.length) {
725 ByteString chunk = fts.getNextChunk();
726 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
727 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
729 fts.markSendStatus(true);
730 if (!fts.isLastChunk(chunkIndex)) {
731 fts.incrementChunkIndex();
735 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
738 @Override protected RaftActorBehavior createBehavior(
739 RaftActorContext actorContext) {
740 return new Leader(actorContext);
744 protected MockRaftActorContext createActorContext() {
745 return createActorContext(leaderActor);
749 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
750 return createActorContext("leader", actorRef);
753 private MockRaftActorContext createActorContextWithFollower() {
754 MockRaftActorContext actorContext = createActorContext();
755 actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
756 followerActor.path().toString()).build());
760 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
761 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
762 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
763 configParams.setElectionTimeoutFactor(100000);
764 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
765 context.setConfigParams(configParams);
769 private ByteString toByteString(Map<String, String> state) {
770 ByteArrayOutputStream b = null;
771 ObjectOutputStream o = null;
774 b = new ByteArrayOutputStream();
775 o = new ObjectOutputStream(b);
776 o.writeObject(state);
777 byte[] snapshotBytes = b.toByteArray();
778 return ByteString.copyFrom(snapshotBytes);
788 } catch (IOException e) {
789 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
794 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
795 AbstractRaftActorBehavior behavior;
797 @Override public void onReceive(Object message) throws Exception {
798 if(behavior != null) {
799 behavior.handleMessage(sender(), message);
802 super.onReceive(message);
805 public static Props props() {
806 return Props.create(ForwardMessageToBehaviorActor.class);
811 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
812 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
814 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
816 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
818 Follower follower = new Follower(followerActorContext);
819 followerActor.underlyingActor().behavior = follower;
821 Map<String, String> peerAddresses = new HashMap<>();
822 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
824 leaderActorContext.setPeerAddresses(peerAddresses);
826 leaderActorContext.getReplicatedLog().removeFrom(0);
829 leaderActorContext.setReplicatedLog(
830 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
832 leaderActorContext.setCommitIndex(1);
834 followerActorContext.getReplicatedLog().removeFrom(0);
836 // follower too has the exact same log entries and has the same commit index
837 followerActorContext.setReplicatedLog(
838 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
840 followerActorContext.setCommitIndex(1);
842 leader = new Leader(leaderActorContext);
844 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
846 assertEquals(1, appendEntries.getLeaderCommit());
847 assertEquals(0, appendEntries.getEntries().size());
848 assertEquals(0, appendEntries.getPrevLogIndex());
850 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
851 leaderActor, AppendEntriesReply.class);
853 assertEquals(2, appendEntriesReply.getLogLastIndex());
854 assertEquals(1, appendEntriesReply.getLogLastTerm());
856 // follower returns its next index
857 assertEquals(2, appendEntriesReply.getLogLastIndex());
858 assertEquals(1, appendEntriesReply.getLogLastTerm());
864 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
865 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
867 MockRaftActorContext leaderActorContext = createActorContext();
869 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
871 Follower follower = new Follower(followerActorContext);
872 followerActor.underlyingActor().behavior = follower;
874 Map<String, String> peerAddresses = new HashMap<>();
875 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
877 leaderActorContext.setPeerAddresses(peerAddresses);
879 leaderActorContext.getReplicatedLog().removeFrom(0);
881 leaderActorContext.setReplicatedLog(
882 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
884 leaderActorContext.setCommitIndex(1);
886 followerActorContext.getReplicatedLog().removeFrom(0);
888 followerActorContext.setReplicatedLog(
889 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
891 // follower has the same log entries but its commit index > leaders commit index
892 followerActorContext.setCommitIndex(2);
894 leader = new Leader(leaderActorContext);
897 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
899 assertEquals(1, appendEntries.getLeaderCommit());
900 assertEquals(0, appendEntries.getEntries().size());
901 assertEquals(0, appendEntries.getPrevLogIndex());
903 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
904 leaderActor, AppendEntriesReply.class);
906 assertEquals(2, appendEntriesReply.getLogLastIndex());
907 assertEquals(1, appendEntriesReply.getLogLastTerm());
909 leaderActor.underlyingActor().behavior = leader;
910 leader.handleMessage(followerActor, appendEntriesReply);
912 leaderActor.underlyingActor().clear();
913 followerActor.underlyingActor().clear();
915 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
916 TimeUnit.MILLISECONDS);
918 leader.handleMessage(leaderActor, new SendHeartBeat());
920 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
922 assertEquals(2, appendEntries.getLeaderCommit());
923 assertEquals(0, appendEntries.getEntries().size());
924 assertEquals(2, appendEntries.getPrevLogIndex());
926 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
928 assertEquals(2, appendEntriesReply.getLogLastIndex());
929 assertEquals(1, appendEntriesReply.getLogLastTerm());
931 assertEquals(2, followerActorContext.getCommitIndex());
937 public void testHandleAppendEntriesReplyFailure(){
938 logStart("testHandleAppendEntriesReplyFailure");
940 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
942 leader = new Leader(leaderActorContext);
944 // Send initial heartbeat reply with last index.
945 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
947 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
948 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
950 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
952 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
954 assertEquals(RaftState.Leader, raftActorBehavior.state());
956 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
960 public void testHandleAppendEntriesReplySuccess() throws Exception {
961 logStart("testHandleAppendEntriesReplySuccess");
963 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
965 leaderActorContext.setReplicatedLog(
966 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
968 leaderActorContext.setCommitIndex(1);
969 leaderActorContext.setLastApplied(1);
970 leaderActorContext.getTermInformation().update(1, "leader");
972 leader = new Leader(leaderActorContext);
974 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
976 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
978 assertEquals(RaftState.Leader, raftActorBehavior.state());
980 assertEquals(2, leaderActorContext.getCommitIndex());
982 ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
983 leaderActor, ApplyLogEntries.class);
985 assertEquals(2, leaderActorContext.getLastApplied());
987 assertEquals(2, applyLogEntries.getToIndex());
989 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
992 assertEquals(1,applyStateList.size());
994 ApplyState applyState = applyStateList.get(0);
996 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1000 public void testHandleAppendEntriesReplyUnknownFollower(){
1001 logStart("testHandleAppendEntriesReplyUnknownFollower");
1003 MockRaftActorContext leaderActorContext = createActorContext();
1005 leader = new Leader(leaderActorContext);
1007 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1009 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1011 assertEquals(RaftState.Leader, raftActorBehavior.state());
1015 public void testHandleRequestVoteReply(){
1016 logStart("testHandleRequestVoteReply");
1018 MockRaftActorContext leaderActorContext = createActorContext();
1020 leader = new Leader(leaderActorContext);
1022 // Should be a no-op.
1023 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1024 new RequestVoteReply(1, true));
1026 assertEquals(RaftState.Leader, raftActorBehavior.state());
1028 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1030 assertEquals(RaftState.Leader, raftActorBehavior.state());
1034 public void testIsolatedLeaderCheckNoFollowers() {
1035 logStart("testIsolatedLeaderCheckNoFollowers");
1037 MockRaftActorContext leaderActorContext = createActorContext();
1039 leader = new Leader(leaderActorContext);
1040 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1041 Assert.assertTrue(behavior instanceof Leader);
1045 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1046 logStart("testIsolatedLeaderCheckTwoFollowers");
1048 new JavaTestKit(getSystem()) {{
1050 ActorRef followerActor1 = getTestActor();
1051 ActorRef followerActor2 = getTestActor();
1053 MockRaftActorContext leaderActorContext = createActorContext();
1055 Map<String, String> peerAddresses = new HashMap<>();
1056 peerAddresses.put("follower-1", followerActor1.path().toString());
1057 peerAddresses.put("follower-2", followerActor2.path().toString());
1059 leaderActorContext.setPeerAddresses(peerAddresses);
1061 leader = new Leader(leaderActorContext);
1062 leader.stopIsolatedLeaderCheckSchedule();
1064 leader.markFollowerActive("follower-1");
1065 leader.markFollowerActive("follower-2");
1066 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1067 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1068 behavior instanceof Leader);
1070 // kill 1 follower and verify if that got killed
1071 final JavaTestKit probe = new JavaTestKit(getSystem());
1072 probe.watch(followerActor1);
1073 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1074 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1075 assertEquals(termMsg1.getActor(), followerActor1);
1077 leader.markFollowerInActive("follower-1");
1078 leader.markFollowerActive("follower-2");
1079 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1080 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1081 behavior instanceof Leader);
1083 // kill 2nd follower and leader should change to Isolated leader
1084 followerActor2.tell(PoisonPill.getInstance(), null);
1085 probe.watch(followerActor2);
1086 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1087 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1088 assertEquals(termMsg2.getActor(), followerActor2);
1090 leader.markFollowerInActive("follower-2");
1091 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1092 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1093 behavior instanceof IsolatedLeader);
1099 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1100 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1102 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1104 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1105 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1106 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1108 leaderActorContext.setConfigParams(configParams);
1110 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1112 followerActorContext.setConfigParams(configParams);
1114 Follower follower = new Follower(followerActorContext);
1115 followerActor.underlyingActor().behavior = follower;
1117 leaderActorContext.getReplicatedLog().removeFrom(0);
1118 leaderActorContext.setCommitIndex(-1);
1119 leaderActorContext.setLastApplied(-1);
1121 followerActorContext.getReplicatedLog().removeFrom(0);
1122 followerActorContext.setCommitIndex(-1);
1123 followerActorContext.setLastApplied(-1);
1125 leader = new Leader(leaderActorContext);
1127 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1128 leaderActor, AppendEntriesReply.class);
1130 leader.handleMessage(followerActor, appendEntriesReply);
1132 // Clear initial heartbeat messages
1134 leaderActor.underlyingActor().clear();
1135 followerActor.underlyingActor().clear();
1138 leaderActorContext.setReplicatedLog(
1139 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1140 leaderActorContext.setCommitIndex(1);
1141 leaderActorContext.setLastApplied(1);
1143 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144 TimeUnit.MILLISECONDS);
1146 leader.handleMessage(leaderActor, new SendHeartBeat());
1148 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1150 // Should send first log entry
1151 assertEquals(1, appendEntries.getLeaderCommit());
1152 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1153 assertEquals(-1, appendEntries.getPrevLogIndex());
1155 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1157 assertEquals(1, appendEntriesReply.getLogLastTerm());
1158 assertEquals(0, appendEntriesReply.getLogLastIndex());
1160 followerActor.underlyingActor().clear();
1162 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1164 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1166 // Should send second log entry
1167 assertEquals(1, appendEntries.getLeaderCommit());
1168 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1173 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1175 private final long electionTimeOutIntervalMillis;
1176 private final int snapshotChunkSize;
1178 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1180 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1181 this.snapshotChunkSize = snapshotChunkSize;
1185 public FiniteDuration getElectionTimeOutInterval() {
1186 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1190 public int getSnapshotChunkSize() {
1191 return snapshotChunkSize;