1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
52 private ActorRef leaderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
54 private ActorRef senderActor =
55 getSystem().actorOf(Props.create(DoNothingActor.class));
58 public void testHandleMessageForUnknownMessage() throws Exception {
59 new JavaTestKit(getSystem()) {{
61 new Leader(createActorContext());
63 // handle message should return the Leader state when it receives an
65 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66 Assert.assertTrue(behavior instanceof Leader);
72 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
73 new JavaTestKit(getSystem()) {{
75 new Within(duration("1 seconds")) {
76 protected void run() {
78 ActorRef followerActor = getTestActor();
80 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
82 Map<String, String> peerAddresses = new HashMap<>();
84 peerAddresses.put(followerActor.path().toString(),
85 followerActor.path().toString());
87 actorContext.setPeerAddresses(peerAddresses);
89 Leader leader = new Leader(actorContext);
90 leader.handleMessage(senderActor, new SendHeartBeat());
93 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
94 // do not put code outside this method, will run afterwards
95 protected String match(Object in) {
96 Object msg = fromSerializableMessage(in);
97 if (msg instanceof AppendEntries) {
98 if (((AppendEntries)msg).getTerm() == 0) {
106 }.get(); // this extracts the received message
108 assertEquals("match", out);
116 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
117 new JavaTestKit(getSystem()) {{
119 new Within(duration("1 seconds")) {
120 protected void run() {
122 ActorRef followerActor = getTestActor();
124 MockRaftActorContext actorContext =
125 (MockRaftActorContext) createActorContext();
127 Map<String, String> peerAddresses = new HashMap<>();
129 peerAddresses.put(followerActor.path().toString(),
130 followerActor.path().toString());
132 actorContext.setPeerAddresses(peerAddresses);
134 Leader leader = new Leader(actorContext);
135 RaftActorBehavior raftBehavior = leader
136 .handleMessage(senderActor, new Replicate(null, null,
137 new MockRaftActorContext.MockReplicatedLogEntry(1,
139 new MockRaftActorContext.MockPayload("foo"))
142 // State should not change
143 assertTrue(raftBehavior instanceof Leader);
146 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
147 // do not put code outside this method, will run afterwards
148 protected String match(Object in) {
149 Object msg = fromSerializableMessage(in);
150 if (msg instanceof AppendEntries) {
151 if (((AppendEntries)msg).getTerm() == 0) {
159 }.get(); // this extracts the received message
161 assertEquals("match", out);
168 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
169 new JavaTestKit(getSystem()) {{
171 new Within(duration("1 seconds")) {
172 protected void run() {
174 ActorRef raftActor = getTestActor();
176 MockRaftActorContext actorContext =
177 new MockRaftActorContext("test", getSystem(), raftActor);
179 actorContext.getReplicatedLog().removeFrom(0);
181 actorContext.setReplicatedLog(
182 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
185 Leader leader = new Leader(actorContext);
186 RaftActorBehavior raftBehavior = leader
187 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
189 // State should not change
190 assertTrue(raftBehavior instanceof Leader);
192 assertEquals(1, actorContext.getCommitIndex());
195 new ExpectMsg<String>(duration("1 seconds"),
197 // do not put code outside this method, will run afterwards
198 protected String match(Object in) {
199 if (in instanceof ApplyState) {
200 if (((ApplyState) in).getIdentifier().equals("state-id")) {
208 }.get(); // this extracts the received message
210 assertEquals("match", out);
218 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
219 new JavaTestKit(getSystem()) {{
220 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
222 Map<String, String> peerAddresses = new HashMap<>();
223 peerAddresses.put(followerActor.path().toString(),
224 followerActor.path().toString());
226 MockRaftActorContext actorContext =
227 (MockRaftActorContext) createActorContext(leaderActor);
228 actorContext.setPeerAddresses(peerAddresses);
230 Map<String, String> leadersSnapshot = new HashMap<>();
231 leadersSnapshot.put("1", "A");
232 leadersSnapshot.put("2", "B");
233 leadersSnapshot.put("3", "C");
236 actorContext.getReplicatedLog().removeFrom(0);
238 final int followersLastIndex = 2;
239 final int snapshotIndex = 3;
240 final int newEntryIndex = 4;
241 final int snapshotTerm = 1;
242 final int currentTerm = 2;
244 // set the snapshot variables in replicatedlog
245 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
246 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
247 actorContext.setCommitIndex(followersLastIndex);
248 //set follower timeout to 2 mins, helps during debugging
249 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
251 MockLeader leader = new MockLeader(actorContext);
254 ReplicatedLogImplEntry entry =
255 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
256 new MockRaftActorContext.MockPayload("D"));
258 //update follower timestamp
259 leader.markFollowerActive(followerActor.path().toString());
261 ByteString bs = toByteString(leadersSnapshot);
262 leader.setSnapshot(Optional.of(bs));
263 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
265 //send first chunk and no InstallSnapshotReply received yet
266 leader.getFollowerToSnapshot().getNextChunk();
267 leader.getFollowerToSnapshot().incrementChunkIndex();
269 leader.handleMessage(leaderActor, new SendHeartBeat());
271 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
272 followerActor, AppendEntries.SERIALIZABLE_CLASS);
274 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
275 "received", aeproto);
277 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
279 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
281 //InstallSnapshotReply received
282 leader.getFollowerToSnapshot().markSendStatus(true);
284 leader.handleMessage(senderActor, new SendHeartBeat());
286 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
287 MessageCollectorActor.getFirstMatching(followerActor,
288 InstallSnapshot.SERIALIZABLE_CLASS);
290 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
293 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
295 assertEquals(snapshotIndex, is.getLastIncludedIndex());
301 public void testSendAppendEntriesSnapshotScenario() {
302 new JavaTestKit(getSystem()) {{
304 ActorRef followerActor = getTestActor();
306 Map<String, String> peerAddresses = new HashMap<>();
307 peerAddresses.put(followerActor.path().toString(),
308 followerActor.path().toString());
310 MockRaftActorContext actorContext =
311 (MockRaftActorContext) createActorContext(getRef());
312 actorContext.setPeerAddresses(peerAddresses);
314 Map<String, String> leadersSnapshot = new HashMap<>();
315 leadersSnapshot.put("1", "A");
316 leadersSnapshot.put("2", "B");
317 leadersSnapshot.put("3", "C");
320 actorContext.getReplicatedLog().removeFrom(0);
322 final int followersLastIndex = 2;
323 final int snapshotIndex = 3;
324 final int newEntryIndex = 4;
325 final int snapshotTerm = 1;
326 final int currentTerm = 2;
328 // set the snapshot variables in replicatedlog
329 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
330 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
331 actorContext.setCommitIndex(followersLastIndex);
333 Leader leader = new Leader(actorContext);
336 ReplicatedLogImplEntry entry =
337 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
338 new MockRaftActorContext.MockPayload("D"));
340 //update follower timestamp
341 leader.markFollowerActive(followerActor.path().toString());
343 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
344 RaftActorBehavior raftBehavior = leader.handleMessage(
345 senderActor, new Replicate(null, "state-id", entry));
347 assertTrue(raftBehavior instanceof Leader);
349 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
350 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
352 protected Boolean match(Object o) throws Exception {
353 if (o instanceof InitiateInstallSnapshot) {
360 boolean initiateInitiateInstallSnapshot = false;
361 for (Boolean b: matches) {
362 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
365 assertTrue(initiateInitiateInstallSnapshot);
370 public void testInitiateInstallSnapshot() throws Exception {
371 new JavaTestKit(getSystem()) {{
373 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
375 ActorRef followerActor = getTestActor();
377 Map<String, String> peerAddresses = new HashMap<>();
378 peerAddresses.put(followerActor.path().toString(),
379 followerActor.path().toString());
382 MockRaftActorContext actorContext =
383 (MockRaftActorContext) createActorContext(leaderActor);
384 actorContext.setPeerAddresses(peerAddresses);
386 Map<String, String> leadersSnapshot = new HashMap<>();
387 leadersSnapshot.put("1", "A");
388 leadersSnapshot.put("2", "B");
389 leadersSnapshot.put("3", "C");
392 actorContext.getReplicatedLog().removeFrom(0);
394 final int followersLastIndex = 2;
395 final int snapshotIndex = 3;
396 final int newEntryIndex = 4;
397 final int snapshotTerm = 1;
398 final int currentTerm = 2;
400 // set the snapshot variables in replicatedlog
401 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
402 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
403 actorContext.setLastApplied(3);
404 actorContext.setCommitIndex(followersLastIndex);
406 Leader leader = new Leader(actorContext);
407 // set the snapshot as absent and check if capture-snapshot is invoked.
408 leader.setSnapshot(Optional.<ByteString>absent());
411 ReplicatedLogImplEntry entry =
412 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
413 new MockRaftActorContext.MockPayload("D"));
415 actorContext.getReplicatedLog().append(entry);
417 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
418 RaftActorBehavior raftBehavior = leader.handleMessage(
419 leaderActor, new InitiateInstallSnapshot());
421 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
422 getFirstMatching(leaderActor, CaptureSnapshot.class);
426 assertTrue(cs.isInstallSnapshotInitiated());
427 assertEquals(3, cs.getLastAppliedIndex());
428 assertEquals(1, cs.getLastAppliedTerm());
429 assertEquals(4, cs.getLastIndex());
430 assertEquals(2, cs.getLastTerm());
435 public void testInstallSnapshot() {
436 new JavaTestKit(getSystem()) {{
438 ActorRef followerActor = getTestActor();
440 Map<String, String> peerAddresses = new HashMap<>();
441 peerAddresses.put(followerActor.path().toString(),
442 followerActor.path().toString());
444 MockRaftActorContext actorContext =
445 (MockRaftActorContext) createActorContext();
446 actorContext.setPeerAddresses(peerAddresses);
449 Map<String, String> leadersSnapshot = new HashMap<>();
450 leadersSnapshot.put("1", "A");
451 leadersSnapshot.put("2", "B");
452 leadersSnapshot.put("3", "C");
455 actorContext.getReplicatedLog().removeFrom(0);
457 final int followersLastIndex = 2;
458 final int snapshotIndex = 3;
459 final int newEntryIndex = 4;
460 final int snapshotTerm = 1;
461 final int currentTerm = 2;
463 // set the snapshot variables in replicatedlog
464 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
465 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
466 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
467 actorContext.setCommitIndex(followersLastIndex);
469 Leader leader = new Leader(actorContext);
472 ReplicatedLogImplEntry entry =
473 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
474 new MockRaftActorContext.MockPayload("D"));
476 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
477 new SendInstallSnapshot(toByteString(leadersSnapshot)));
479 assertTrue(raftBehavior instanceof Leader);
481 // check if installsnapshot gets called with the correct values.
483 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
484 // do not put code outside this method, will run afterwards
485 protected String match(Object in) {
486 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
487 InstallSnapshot is = (InstallSnapshot)
488 SerializationUtils.fromSerializable(in);
489 if (is.getData() == null) {
490 return "InstallSnapshot data is null";
492 if (is.getLastIncludedIndex() != snapshotIndex) {
493 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
495 if (is.getLastIncludedTerm() != snapshotTerm) {
496 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
498 if (is.getTerm() == currentTerm) {
499 return is.getTerm() + "!=" + currentTerm;
505 return "message mismatch:" + in.getClass();
508 }.get(); // this extracts the received message
510 assertEquals("match", out);
515 public void testHandleInstallSnapshotReplyLastChunk() {
516 new JavaTestKit(getSystem()) {{
518 ActorRef followerActor = getTestActor();
520 Map<String, String> peerAddresses = new HashMap<>();
521 peerAddresses.put(followerActor.path().toString(),
522 followerActor.path().toString());
524 final int followersLastIndex = 2;
525 final int snapshotIndex = 3;
526 final int newEntryIndex = 4;
527 final int snapshotTerm = 1;
528 final int currentTerm = 2;
530 MockRaftActorContext actorContext =
531 (MockRaftActorContext) createActorContext();
532 actorContext.setPeerAddresses(peerAddresses);
533 actorContext.setCommitIndex(followersLastIndex);
535 MockLeader leader = new MockLeader(actorContext);
537 Map<String, String> leadersSnapshot = new HashMap<>();
538 leadersSnapshot.put("1", "A");
539 leadersSnapshot.put("2", "B");
540 leadersSnapshot.put("3", "C");
542 // set the snapshot variables in replicatedlog
544 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
545 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
546 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
548 ByteString bs = toByteString(leadersSnapshot);
549 leader.setSnapshot(Optional.of(bs));
550 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
551 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
552 leader.getFollowerToSnapshot().getNextChunk();
553 leader.getFollowerToSnapshot().incrementChunkIndex();
557 actorContext.getReplicatedLog().removeFrom(0);
559 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
560 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
561 leader.getFollowerToSnapshot().getChunkIndex(), true));
563 assertTrue(raftBehavior instanceof Leader);
565 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
566 assertEquals(leader.followerToLog.size(), 1);
567 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
568 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
569 assertEquals(snapshotIndex, fli.getMatchIndex().get());
570 assertEquals(snapshotIndex, fli.getMatchIndex().get());
571 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
576 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
577 new JavaTestKit(getSystem()) {{
579 TestActorRef<MessageCollectorActor> followerActor =
580 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
582 Map<String, String> peerAddresses = new HashMap<>();
583 peerAddresses.put(followerActor.path().toString(),
584 followerActor.path().toString());
586 final int followersLastIndex = 2;
587 final int snapshotIndex = 3;
588 final int snapshotTerm = 1;
589 final int currentTerm = 2;
591 MockRaftActorContext actorContext =
592 (MockRaftActorContext) createActorContext();
594 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
596 public int getSnapshotChunkSize() {
600 actorContext.setPeerAddresses(peerAddresses);
601 actorContext.setCommitIndex(followersLastIndex);
603 MockLeader leader = new MockLeader(actorContext);
605 Map<String, String> leadersSnapshot = new HashMap<>();
606 leadersSnapshot.put("1", "A");
607 leadersSnapshot.put("2", "B");
608 leadersSnapshot.put("3", "C");
610 // set the snapshot variables in replicatedlog
611 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
612 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
613 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
615 ByteString bs = toByteString(leadersSnapshot);
616 leader.setSnapshot(Optional.of(bs));
618 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
620 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
622 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
624 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
626 assertEquals(1, installSnapshot.getChunkIndex());
627 assertEquals(3, installSnapshot.getTotalChunks());
630 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
632 leader.handleMessage(leaderActor, new SendHeartBeat());
634 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
636 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
638 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
640 assertEquals(1, installSnapshot.getChunkIndex());
641 assertEquals(3, installSnapshot.getTotalChunks());
643 followerActor.tell(PoisonPill.getInstance(), getRef());
648 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
649 new JavaTestKit(getSystem()) {
652 TestActorRef<MessageCollectorActor> followerActor =
653 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
655 Map<String, String> peerAddresses = new HashMap<>();
656 peerAddresses.put(followerActor.path().toString(),
657 followerActor.path().toString());
659 final int followersLastIndex = 2;
660 final int snapshotIndex = 3;
661 final int snapshotTerm = 1;
662 final int currentTerm = 2;
664 MockRaftActorContext actorContext =
665 (MockRaftActorContext) createActorContext();
667 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
669 public int getSnapshotChunkSize() {
673 actorContext.setPeerAddresses(peerAddresses);
674 actorContext.setCommitIndex(followersLastIndex);
676 MockLeader leader = new MockLeader(actorContext);
678 Map<String, String> leadersSnapshot = new HashMap<>();
679 leadersSnapshot.put("1", "A");
680 leadersSnapshot.put("2", "B");
681 leadersSnapshot.put("3", "C");
683 // set the snapshot variables in replicatedlog
684 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
685 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
686 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
688 ByteString bs = toByteString(leadersSnapshot);
689 leader.setSnapshot(Optional.of(bs));
691 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
693 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
695 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
697 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
699 assertEquals(1, installSnapshot.getChunkIndex());
700 assertEquals(3, installSnapshot.getTotalChunks());
701 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
703 int hashCode = installSnapshot.getData().hashCode();
705 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
707 leader.handleMessage(leaderActor, new SendHeartBeat());
709 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
711 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
713 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
715 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
717 assertEquals(2, installSnapshot.getChunkIndex());
718 assertEquals(3, installSnapshot.getTotalChunks());
719 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
721 followerActor.tell(PoisonPill.getInstance(), getRef());
726 public void testFollowerToSnapshotLogic() {
728 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
730 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
732 public int getSnapshotChunkSize() {
737 MockLeader leader = new MockLeader(actorContext);
739 Map<String, String> leadersSnapshot = new HashMap<>();
740 leadersSnapshot.put("1", "A");
741 leadersSnapshot.put("2", "B");
742 leadersSnapshot.put("3", "C");
744 ByteString bs = toByteString(leadersSnapshot);
745 byte[] barray = bs.toByteArray();
747 leader.createFollowerToSnapshot("followerId", bs);
748 assertEquals(bs.size(), barray.length);
751 for (int i=0; i < barray.length; i = i + 50) {
755 if (i + 50 > barray.length) {
759 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
760 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
761 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
763 leader.getFollowerToSnapshot().markSendStatus(true);
764 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
765 leader.getFollowerToSnapshot().incrementChunkIndex();
769 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
773 @Override protected RaftActorBehavior createBehavior(
774 RaftActorContext actorContext) {
775 return new Leader(actorContext);
778 @Override protected RaftActorContext createActorContext() {
779 return createActorContext(leaderActor);
782 protected RaftActorContext createActorContext(ActorRef actorRef) {
783 return new MockRaftActorContext("test", getSystem(), actorRef);
786 private ByteString toByteString(Map<String, String> state) {
787 ByteArrayOutputStream b = null;
788 ObjectOutputStream o = null;
791 b = new ByteArrayOutputStream();
792 o = new ObjectOutputStream(b);
793 o.writeObject(state);
794 byte[] snapshotBytes = b.toByteArray();
795 return ByteString.copyFrom(snapshotBytes);
805 } catch (IOException e) {
806 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
811 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
812 private static AbstractRaftActorBehavior behavior;
814 public ForwardMessageToBehaviorActor(){
818 @Override public void onReceive(Object message) throws Exception {
819 super.onReceive(message);
820 behavior.handleMessage(sender(), message);
823 public static void setBehavior(AbstractRaftActorBehavior behavior){
824 ForwardMessageToBehaviorActor.behavior = behavior;
829 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
830 new JavaTestKit(getSystem()) {{
832 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
834 MockRaftActorContext leaderActorContext =
835 new MockRaftActorContext("leader", getSystem(), leaderActor);
837 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
839 MockRaftActorContext followerActorContext =
840 new MockRaftActorContext("follower", getSystem(), followerActor);
842 Follower follower = new Follower(followerActorContext);
844 ForwardMessageToBehaviorActor.setBehavior(follower);
846 Map<String, String> peerAddresses = new HashMap<>();
847 peerAddresses.put(followerActor.path().toString(),
848 followerActor.path().toString());
850 leaderActorContext.setPeerAddresses(peerAddresses);
852 leaderActorContext.getReplicatedLog().removeFrom(0);
855 leaderActorContext.setReplicatedLog(
856 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
858 leaderActorContext.setCommitIndex(1);
860 followerActorContext.getReplicatedLog().removeFrom(0);
862 // follower too has the exact same log entries and has the same commit index
863 followerActorContext.setReplicatedLog(
864 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
866 followerActorContext.setCommitIndex(1);
868 Leader leader = new Leader(leaderActorContext);
869 leader.markFollowerActive(followerActor.path().toString());
871 leader.handleMessage(leaderActor, new SendHeartBeat());
873 AppendEntriesMessages.AppendEntries appendEntries =
874 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
875 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
877 assertNotNull(appendEntries);
879 assertEquals(1, appendEntries.getLeaderCommit());
880 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
881 assertEquals(0, appendEntries.getPrevLogIndex());
883 AppendEntriesReply appendEntriesReply =
884 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
885 leaderActor, AppendEntriesReply.class);
887 assertNotNull(appendEntriesReply);
889 // follower returns its next index
890 assertEquals(2, appendEntriesReply.getLogLastIndex());
891 assertEquals(1, appendEntriesReply.getLogLastTerm());
898 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
899 new JavaTestKit(getSystem()) {{
901 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
903 MockRaftActorContext leaderActorContext =
904 new MockRaftActorContext("leader", getSystem(), leaderActor);
906 ActorRef followerActor = getSystem().actorOf(
907 Props.create(ForwardMessageToBehaviorActor.class));
909 MockRaftActorContext followerActorContext =
910 new MockRaftActorContext("follower", getSystem(), followerActor);
912 Follower follower = new Follower(followerActorContext);
914 ForwardMessageToBehaviorActor.setBehavior(follower);
916 Map<String, String> peerAddresses = new HashMap<>();
917 peerAddresses.put(followerActor.path().toString(),
918 followerActor.path().toString());
920 leaderActorContext.setPeerAddresses(peerAddresses);
922 leaderActorContext.getReplicatedLog().removeFrom(0);
924 leaderActorContext.setReplicatedLog(
925 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
927 leaderActorContext.setCommitIndex(1);
929 followerActorContext.getReplicatedLog().removeFrom(0);
931 followerActorContext.setReplicatedLog(
932 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
934 // follower has the same log entries but its commit index > leaders commit index
935 followerActorContext.setCommitIndex(2);
937 Leader leader = new Leader(leaderActorContext);
938 leader.markFollowerActive(followerActor.path().toString());
940 leader.handleMessage(leaderActor, new SendHeartBeat());
942 AppendEntriesMessages.AppendEntries appendEntries =
943 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
944 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
946 assertNotNull(appendEntries);
948 assertEquals(1, appendEntries.getLeaderCommit());
949 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
950 assertEquals(0, appendEntries.getPrevLogIndex());
952 AppendEntriesReply appendEntriesReply =
953 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
954 leaderActor, AppendEntriesReply.class);
956 assertNotNull(appendEntriesReply);
958 assertEquals(2, appendEntriesReply.getLogLastIndex());
959 assertEquals(1, appendEntriesReply.getLogLastTerm());
965 public void testHandleAppendEntriesReplyFailure(){
966 new JavaTestKit(getSystem()) {
969 ActorRef leaderActor =
970 getSystem().actorOf(Props.create(MessageCollectorActor.class));
972 ActorRef followerActor =
973 getSystem().actorOf(Props.create(MessageCollectorActor.class));
976 MockRaftActorContext leaderActorContext =
977 new MockRaftActorContext("leader", getSystem(), leaderActor);
979 Map<String, String> peerAddresses = new HashMap<>();
980 peerAddresses.put("follower-1",
981 followerActor.path().toString());
983 leaderActorContext.setPeerAddresses(peerAddresses);
985 Leader leader = new Leader(leaderActorContext);
987 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
989 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
991 assertEquals(RaftState.Leader, raftActorBehavior.state());
997 public void testHandleAppendEntriesReplySuccess() throws Exception {
998 new JavaTestKit(getSystem()) {
1001 ActorRef leaderActor =
1002 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1004 ActorRef followerActor =
1005 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1008 MockRaftActorContext leaderActorContext =
1009 new MockRaftActorContext("leader", getSystem(), leaderActor);
1011 leaderActorContext.setReplicatedLog(
1012 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1014 Map<String, String> peerAddresses = new HashMap<>();
1015 peerAddresses.put("follower-1",
1016 followerActor.path().toString());
1018 leaderActorContext.setPeerAddresses(peerAddresses);
1019 leaderActorContext.setCommitIndex(1);
1020 leaderActorContext.setLastApplied(1);
1021 leaderActorContext.getTermInformation().update(1, "leader");
1023 Leader leader = new Leader(leaderActorContext);
1025 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1027 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1029 assertEquals(RaftState.Leader, raftActorBehavior.state());
1031 assertEquals(2, leaderActorContext.getCommitIndex());
1033 ApplyLogEntries applyLogEntries =
1034 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1035 ApplyLogEntries.class);
1037 assertNotNull(applyLogEntries);
1039 assertEquals(2, leaderActorContext.getLastApplied());
1041 assertEquals(2, applyLogEntries.getToIndex());
1043 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1046 assertEquals(1,applyStateList.size());
1048 ApplyState applyState = (ApplyState) applyStateList.get(0);
1050 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1056 public void testHandleAppendEntriesReplyUnknownFollower(){
1057 new JavaTestKit(getSystem()) {
1060 ActorRef leaderActor =
1061 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1063 MockRaftActorContext leaderActorContext =
1064 new MockRaftActorContext("leader", getSystem(), leaderActor);
1066 Leader leader = new Leader(leaderActorContext);
1068 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1070 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1072 assertEquals(RaftState.Leader, raftActorBehavior.state());
1078 public void testHandleRequestVoteReply(){
1079 new JavaTestKit(getSystem()) {
1082 ActorRef leaderActor =
1083 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1085 MockRaftActorContext leaderActorContext =
1086 new MockRaftActorContext("leader", getSystem(), leaderActor);
1088 Leader leader = new Leader(leaderActorContext);
1090 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1092 assertEquals(RaftState.Leader, raftActorBehavior.state());
1094 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1096 assertEquals(RaftState.Leader, raftActorBehavior.state());
1101 public void testIsolatedLeaderCheckNoFollowers() {
1102 new JavaTestKit(getSystem()) {{
1103 ActorRef leaderActor = getTestActor();
1105 MockRaftActorContext leaderActorContext =
1106 new MockRaftActorContext("leader", getSystem(), leaderActor);
1108 Map<String, String> peerAddresses = new HashMap<>();
1109 leaderActorContext.setPeerAddresses(peerAddresses);
1111 Leader leader = new Leader(leaderActorContext);
1112 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1113 Assert.assertTrue(behavior instanceof Leader);
1118 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1119 new JavaTestKit(getSystem()) {{
1121 ActorRef followerActor1 = getTestActor();
1122 ActorRef followerActor2 = getTestActor();
1124 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1126 Map<String, String> peerAddresses = new HashMap<>();
1127 peerAddresses.put("follower-1", followerActor1.path().toString());
1128 peerAddresses.put("follower-2", followerActor2.path().toString());
1130 leaderActorContext.setPeerAddresses(peerAddresses);
1132 Leader leader = new Leader(leaderActorContext);
1133 leader.stopIsolatedLeaderCheckSchedule();
1135 leader.markFollowerActive("follower-1");
1136 leader.markFollowerActive("follower-2");
1137 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1138 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1139 behavior instanceof Leader);
1141 // kill 1 follower and verify if that got killed
1142 final JavaTestKit probe = new JavaTestKit(getSystem());
1143 probe.watch(followerActor1);
1144 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1145 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1146 assertEquals(termMsg1.getActor(), followerActor1);
1148 leader.markFollowerInActive("follower-1");
1149 leader.markFollowerActive("follower-2");
1150 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1151 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1152 behavior instanceof Leader);
1154 // kill 2nd follower and leader should change to Isolated leader
1155 followerActor2.tell(PoisonPill.getInstance(), null);
1156 probe.watch(followerActor2);
1157 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1158 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1159 assertEquals(termMsg2.getActor(), followerActor2);
1161 leader.markFollowerInActive("follower-2");
1162 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1163 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1164 behavior instanceof IsolatedLeader);
1169 class MockLeader extends Leader {
1171 FollowerToSnapshot fts;
1173 public MockLeader(RaftActorContext context){
1177 public FollowerToSnapshot getFollowerToSnapshot() {
1181 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1182 fts = new FollowerToSnapshot(bs);
1183 mapFollowerToSnapshot.put(followerId, fts);
1188 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1190 private long electionTimeOutIntervalMillis;
1191 private int snapshotChunkSize;
1193 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1195 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1196 this.snapshotChunkSize = snapshotChunkSize;
1200 public FiniteDuration getElectionTimeOutInterval() {
1201 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1205 public int getSnapshotChunkSize() {
1206 return snapshotChunkSize;