1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51 private final ActorRef leaderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
53 private final ActorRef senderActor =
54 getSystem().actorOf(Props.create(DoNothingActor.class));
57 public void testHandleMessageForUnknownMessage() throws Exception {
58 new JavaTestKit(getSystem()) {{
60 new Leader(createActorContext());
62 // handle message should return the Leader state when it receives an
64 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65 Assert.assertTrue(behavior instanceof Leader);
70 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71 new JavaTestKit(getSystem()) {{
73 new Within(duration("1 seconds")) {
75 protected void run() {
77 ActorRef followerActor = getTestActor();
79 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81 Map<String, String> peerAddresses = new HashMap<>();
83 peerAddresses.put(followerActor.path().toString(),
84 followerActor.path().toString());
86 actorContext.setPeerAddresses(peerAddresses);
88 Leader leader = new Leader(actorContext);
89 leader.handleMessage(senderActor, new SendHeartBeat());
92 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93 // do not put code outside this method, will run afterwards
95 protected String match(Object in) {
96 Object msg = fromSerializableMessage(in);
97 if (msg instanceof AppendEntries) {
98 if (((AppendEntries)msg).getTerm() == 0) {
106 }.get(); // this extracts the received message
108 assertEquals("match", out);
116 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
117 new JavaTestKit(getSystem()) {{
119 new Within(duration("1 seconds")) {
121 protected void run() {
123 ActorRef followerActor = getTestActor();
125 MockRaftActorContext actorContext =
126 (MockRaftActorContext) createActorContext();
128 Map<String, String> peerAddresses = new HashMap<>();
130 peerAddresses.put(followerActor.path().toString(),
131 followerActor.path().toString());
133 actorContext.setPeerAddresses(peerAddresses);
135 Leader leader = new Leader(actorContext);
136 RaftActorBehavior raftBehavior = leader
137 .handleMessage(senderActor, new Replicate(null, null,
138 new MockRaftActorContext.MockReplicatedLogEntry(1,
140 new MockRaftActorContext.MockPayload("foo"))
143 // State should not change
144 assertTrue(raftBehavior instanceof Leader);
147 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
148 // do not put code outside this method, will run afterwards
150 protected String match(Object in) {
151 Object msg = fromSerializableMessage(in);
152 if (msg instanceof AppendEntries) {
153 if (((AppendEntries)msg).getTerm() == 0) {
161 }.get(); // this extracts the received message
163 assertEquals("match", out);
170 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
171 new JavaTestKit(getSystem()) {{
173 new Within(duration("1 seconds")) {
175 protected void run() {
177 ActorRef raftActor = getTestActor();
179 MockRaftActorContext actorContext =
180 new MockRaftActorContext("test", getSystem(), raftActor);
182 actorContext.getReplicatedLog().removeFrom(0);
184 actorContext.setReplicatedLog(
185 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
188 Leader leader = new Leader(actorContext);
189 RaftActorBehavior raftBehavior = leader
190 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
192 // State should not change
193 assertTrue(raftBehavior instanceof Leader);
195 assertEquals(1, actorContext.getCommitIndex());
198 new ExpectMsg<String>(duration("1 seconds"),
200 // do not put code outside this method, will run afterwards
202 protected String match(Object in) {
203 if (in instanceof ApplyState) {
204 if (((ApplyState) in).getIdentifier().equals("state-id")) {
212 }.get(); // this extracts the received message
214 assertEquals("match", out);
222 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
223 new JavaTestKit(getSystem()) {{
224 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
226 Map<String, String> peerAddresses = new HashMap<>();
227 peerAddresses.put(followerActor.path().toString(),
228 followerActor.path().toString());
230 MockRaftActorContext actorContext =
231 (MockRaftActorContext) createActorContext(leaderActor);
232 actorContext.setPeerAddresses(peerAddresses);
234 Map<String, String> leadersSnapshot = new HashMap<>();
235 leadersSnapshot.put("1", "A");
236 leadersSnapshot.put("2", "B");
237 leadersSnapshot.put("3", "C");
240 actorContext.getReplicatedLog().removeFrom(0);
242 final int followersLastIndex = 2;
243 final int snapshotIndex = 3;
244 final int newEntryIndex = 4;
245 final int snapshotTerm = 1;
246 final int currentTerm = 2;
248 // set the snapshot variables in replicatedlog
249 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
250 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
251 actorContext.setCommitIndex(followersLastIndex);
252 //set follower timeout to 2 mins, helps during debugging
253 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
255 MockLeader leader = new MockLeader(actorContext);
258 ReplicatedLogImplEntry entry =
259 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
260 new MockRaftActorContext.MockPayload("D"));
262 //update follower timestamp
263 leader.markFollowerActive(followerActor.path().toString());
265 ByteString bs = toByteString(leadersSnapshot);
266 leader.setSnapshot(Optional.of(bs));
267 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
269 //send first chunk and no InstallSnapshotReply received yet
270 leader.getFollowerToSnapshot().getNextChunk();
271 leader.getFollowerToSnapshot().incrementChunkIndex();
273 leader.handleMessage(leaderActor, new SendHeartBeat());
275 AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
276 followerActor, AppendEntries.class);
278 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
279 "received", aeproto);
281 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
283 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
285 //InstallSnapshotReply received
286 leader.getFollowerToSnapshot().markSendStatus(true);
288 leader.handleMessage(senderActor, new SendHeartBeat());
290 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
291 MessageCollectorActor.getFirstMatching(followerActor,
292 InstallSnapshot.SERIALIZABLE_CLASS);
294 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
297 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
299 assertEquals(snapshotIndex, is.getLastIncludedIndex());
305 public void testSendAppendEntriesSnapshotScenario() {
306 new JavaTestKit(getSystem()) {{
308 ActorRef followerActor = getTestActor();
310 Map<String, String> peerAddresses = new HashMap<>();
311 peerAddresses.put(followerActor.path().toString(),
312 followerActor.path().toString());
314 MockRaftActorContext actorContext =
315 (MockRaftActorContext) createActorContext(getRef());
316 actorContext.setPeerAddresses(peerAddresses);
318 Map<String, String> leadersSnapshot = new HashMap<>();
319 leadersSnapshot.put("1", "A");
320 leadersSnapshot.put("2", "B");
321 leadersSnapshot.put("3", "C");
324 actorContext.getReplicatedLog().removeFrom(0);
326 final int followersLastIndex = 2;
327 final int snapshotIndex = 3;
328 final int newEntryIndex = 4;
329 final int snapshotTerm = 1;
330 final int currentTerm = 2;
332 // set the snapshot variables in replicatedlog
333 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
334 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
335 actorContext.setCommitIndex(followersLastIndex);
337 Leader leader = new Leader(actorContext);
340 ReplicatedLogImplEntry entry =
341 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
342 new MockRaftActorContext.MockPayload("D"));
344 //update follower timestamp
345 leader.markFollowerActive(followerActor.path().toString());
347 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
348 RaftActorBehavior raftBehavior = leader.handleMessage(
349 senderActor, new Replicate(null, "state-id", entry));
351 assertTrue(raftBehavior instanceof Leader);
353 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
354 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
356 protected Boolean match(Object o) throws Exception {
357 if (o instanceof InitiateInstallSnapshot) {
364 boolean initiateInitiateInstallSnapshot = false;
365 for (Boolean b: matches) {
366 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
369 assertTrue(initiateInitiateInstallSnapshot);
374 public void testInitiateInstallSnapshot() throws Exception {
375 new JavaTestKit(getSystem()) {{
377 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
379 ActorRef followerActor = getTestActor();
381 Map<String, String> peerAddresses = new HashMap<>();
382 peerAddresses.put(followerActor.path().toString(),
383 followerActor.path().toString());
386 MockRaftActorContext actorContext =
387 (MockRaftActorContext) createActorContext(leaderActor);
388 actorContext.setPeerAddresses(peerAddresses);
390 Map<String, String> leadersSnapshot = new HashMap<>();
391 leadersSnapshot.put("1", "A");
392 leadersSnapshot.put("2", "B");
393 leadersSnapshot.put("3", "C");
396 actorContext.getReplicatedLog().removeFrom(0);
398 final int followersLastIndex = 2;
399 final int snapshotIndex = 3;
400 final int newEntryIndex = 4;
401 final int snapshotTerm = 1;
402 final int currentTerm = 2;
404 // set the snapshot variables in replicatedlog
405 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
406 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
407 actorContext.setLastApplied(3);
408 actorContext.setCommitIndex(followersLastIndex);
410 Leader leader = new Leader(actorContext);
411 // set the snapshot as absent and check if capture-snapshot is invoked.
412 leader.setSnapshot(Optional.<ByteString>absent());
415 ReplicatedLogImplEntry entry =
416 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
417 new MockRaftActorContext.MockPayload("D"));
419 actorContext.getReplicatedLog().append(entry);
421 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
422 RaftActorBehavior raftBehavior = leader.handleMessage(
423 leaderActor, new InitiateInstallSnapshot());
425 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
426 getFirstMatching(leaderActor, CaptureSnapshot.class);
430 assertTrue(cs.isInstallSnapshotInitiated());
431 assertEquals(3, cs.getLastAppliedIndex());
432 assertEquals(1, cs.getLastAppliedTerm());
433 assertEquals(4, cs.getLastIndex());
434 assertEquals(2, cs.getLastTerm());
439 public void testInstallSnapshot() {
440 new JavaTestKit(getSystem()) {{
442 ActorRef followerActor = getTestActor();
444 Map<String, String> peerAddresses = new HashMap<>();
445 peerAddresses.put(followerActor.path().toString(),
446 followerActor.path().toString());
448 MockRaftActorContext actorContext =
449 (MockRaftActorContext) createActorContext();
450 actorContext.setPeerAddresses(peerAddresses);
453 Map<String, String> leadersSnapshot = new HashMap<>();
454 leadersSnapshot.put("1", "A");
455 leadersSnapshot.put("2", "B");
456 leadersSnapshot.put("3", "C");
459 actorContext.getReplicatedLog().removeFrom(0);
461 final int followersLastIndex = 2;
462 final int snapshotIndex = 3;
463 final int newEntryIndex = 4;
464 final int snapshotTerm = 1;
465 final int currentTerm = 2;
467 // set the snapshot variables in replicatedlog
468 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
469 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
470 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
471 actorContext.setCommitIndex(followersLastIndex);
473 Leader leader = new Leader(actorContext);
476 ReplicatedLogImplEntry entry =
477 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
478 new MockRaftActorContext.MockPayload("D"));
480 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
481 new SendInstallSnapshot(toByteString(leadersSnapshot)));
483 assertTrue(raftBehavior instanceof Leader);
485 // check if installsnapshot gets called with the correct values.
487 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
488 // do not put code outside this method, will run afterwards
490 protected String match(Object in) {
491 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
492 InstallSnapshot is = (InstallSnapshot)
493 SerializationUtils.fromSerializable(in);
494 if (is.getData() == null) {
495 return "InstallSnapshot data is null";
497 if (is.getLastIncludedIndex() != snapshotIndex) {
498 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
500 if (is.getLastIncludedTerm() != snapshotTerm) {
501 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
503 if (is.getTerm() == currentTerm) {
504 return is.getTerm() + "!=" + currentTerm;
510 return "message mismatch:" + in.getClass();
513 }.get(); // this extracts the received message
515 assertEquals("match", out);
520 public void testHandleInstallSnapshotReplyLastChunk() {
521 new JavaTestKit(getSystem()) {{
523 ActorRef followerActor = getTestActor();
525 Map<String, String> peerAddresses = new HashMap<>();
526 peerAddresses.put(followerActor.path().toString(),
527 followerActor.path().toString());
529 final int followersLastIndex = 2;
530 final int snapshotIndex = 3;
531 final int newEntryIndex = 4;
532 final int snapshotTerm = 1;
533 final int currentTerm = 2;
535 MockRaftActorContext actorContext =
536 (MockRaftActorContext) createActorContext();
537 actorContext.setPeerAddresses(peerAddresses);
538 actorContext.setCommitIndex(followersLastIndex);
540 MockLeader leader = new MockLeader(actorContext);
542 Map<String, String> leadersSnapshot = new HashMap<>();
543 leadersSnapshot.put("1", "A");
544 leadersSnapshot.put("2", "B");
545 leadersSnapshot.put("3", "C");
547 // set the snapshot variables in replicatedlog
549 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
550 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
551 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
553 ByteString bs = toByteString(leadersSnapshot);
554 leader.setSnapshot(Optional.of(bs));
555 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
556 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
557 leader.getFollowerToSnapshot().getNextChunk();
558 leader.getFollowerToSnapshot().incrementChunkIndex();
562 actorContext.getReplicatedLog().removeFrom(0);
564 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
565 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
566 leader.getFollowerToSnapshot().getChunkIndex(), true));
568 assertTrue(raftBehavior instanceof Leader);
570 assertEquals(0, leader.followerSnapshotSize());
571 assertEquals(1, leader.followerLogSize());
572 assertNotNull(leader.getFollower(followerActor.path().toString()));
573 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
574 assertEquals(snapshotIndex, fli.getMatchIndex());
575 assertEquals(snapshotIndex, fli.getMatchIndex());
576 assertEquals(snapshotIndex + 1, fli.getNextIndex());
581 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
582 new JavaTestKit(getSystem()) {{
584 TestActorRef<MessageCollectorActor> followerActor =
585 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
587 Map<String, String> peerAddresses = new HashMap<>();
588 peerAddresses.put(followerActor.path().toString(),
589 followerActor.path().toString());
591 final int followersLastIndex = 2;
592 final int snapshotIndex = 3;
593 final int snapshotTerm = 1;
594 final int currentTerm = 2;
596 MockRaftActorContext actorContext =
597 (MockRaftActorContext) createActorContext();
599 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
601 public int getSnapshotChunkSize() {
605 actorContext.setPeerAddresses(peerAddresses);
606 actorContext.setCommitIndex(followersLastIndex);
608 MockLeader leader = new MockLeader(actorContext);
610 Map<String, String> leadersSnapshot = new HashMap<>();
611 leadersSnapshot.put("1", "A");
612 leadersSnapshot.put("2", "B");
613 leadersSnapshot.put("3", "C");
615 // set the snapshot variables in replicatedlog
616 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
617 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
618 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
620 ByteString bs = toByteString(leadersSnapshot);
621 leader.setSnapshot(Optional.of(bs));
623 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
625 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
627 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
629 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
631 assertEquals(1, installSnapshot.getChunkIndex());
632 assertEquals(3, installSnapshot.getTotalChunks());
635 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
637 leader.handleMessage(leaderActor, new SendHeartBeat());
639 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
641 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
643 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
645 assertEquals(1, installSnapshot.getChunkIndex());
646 assertEquals(3, installSnapshot.getTotalChunks());
648 followerActor.tell(PoisonPill.getInstance(), getRef());
653 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
654 new JavaTestKit(getSystem()) {
657 TestActorRef<MessageCollectorActor> followerActor =
658 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
660 Map<String, String> peerAddresses = new HashMap<>();
661 peerAddresses.put(followerActor.path().toString(),
662 followerActor.path().toString());
664 final int followersLastIndex = 2;
665 final int snapshotIndex = 3;
666 final int snapshotTerm = 1;
667 final int currentTerm = 2;
669 MockRaftActorContext actorContext =
670 (MockRaftActorContext) createActorContext();
672 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
674 public int getSnapshotChunkSize() {
678 actorContext.setPeerAddresses(peerAddresses);
679 actorContext.setCommitIndex(followersLastIndex);
681 MockLeader leader = new MockLeader(actorContext);
683 Map<String, String> leadersSnapshot = new HashMap<>();
684 leadersSnapshot.put("1", "A");
685 leadersSnapshot.put("2", "B");
686 leadersSnapshot.put("3", "C");
688 // set the snapshot variables in replicatedlog
689 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
690 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
691 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
693 ByteString bs = toByteString(leadersSnapshot);
694 leader.setSnapshot(Optional.of(bs));
696 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
698 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
700 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
702 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
704 assertEquals(1, installSnapshot.getChunkIndex());
705 assertEquals(3, installSnapshot.getTotalChunks());
706 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
708 int hashCode = installSnapshot.getData().hashCode();
710 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
712 leader.handleMessage(leaderActor, new SendHeartBeat());
714 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
716 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
718 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
720 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
722 assertEquals(2, installSnapshot.getChunkIndex());
723 assertEquals(3, installSnapshot.getTotalChunks());
724 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
726 followerActor.tell(PoisonPill.getInstance(), getRef());
731 public void testFollowerToSnapshotLogic() {
733 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
735 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
737 public int getSnapshotChunkSize() {
742 MockLeader leader = new MockLeader(actorContext);
744 Map<String, String> leadersSnapshot = new HashMap<>();
745 leadersSnapshot.put("1", "A");
746 leadersSnapshot.put("2", "B");
747 leadersSnapshot.put("3", "C");
749 ByteString bs = toByteString(leadersSnapshot);
750 byte[] barray = bs.toByteArray();
752 leader.createFollowerToSnapshot("followerId", bs);
753 assertEquals(bs.size(), barray.length);
756 for (int i=0; i < barray.length; i = i + 50) {
760 if (i + 50 > barray.length) {
764 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
765 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
766 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
768 leader.getFollowerToSnapshot().markSendStatus(true);
769 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
770 leader.getFollowerToSnapshot().incrementChunkIndex();
774 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
778 @Override protected RaftActorBehavior createBehavior(
779 RaftActorContext actorContext) {
780 return new Leader(actorContext);
783 @Override protected RaftActorContext createActorContext() {
784 return createActorContext(leaderActor);
788 protected RaftActorContext createActorContext(ActorRef actorRef) {
789 return new MockRaftActorContext("test", getSystem(), actorRef);
792 private ByteString toByteString(Map<String, String> state) {
793 ByteArrayOutputStream b = null;
794 ObjectOutputStream o = null;
797 b = new ByteArrayOutputStream();
798 o = new ObjectOutputStream(b);
799 o.writeObject(state);
800 byte[] snapshotBytes = b.toByteArray();
801 return ByteString.copyFrom(snapshotBytes);
811 } catch (IOException e) {
812 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
817 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
818 private static AbstractRaftActorBehavior behavior;
820 public ForwardMessageToBehaviorActor(){
824 @Override public void onReceive(Object message) throws Exception {
825 super.onReceive(message);
826 behavior.handleMessage(sender(), message);
829 public static void setBehavior(AbstractRaftActorBehavior behavior){
830 ForwardMessageToBehaviorActor.behavior = behavior;
835 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
836 new JavaTestKit(getSystem()) {{
838 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
840 MockRaftActorContext leaderActorContext =
841 new MockRaftActorContext("leader", getSystem(), leaderActor);
843 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
845 MockRaftActorContext followerActorContext =
846 new MockRaftActorContext("follower", getSystem(), followerActor);
848 Follower follower = new Follower(followerActorContext);
850 ForwardMessageToBehaviorActor.setBehavior(follower);
852 Map<String, String> peerAddresses = new HashMap<>();
853 peerAddresses.put(followerActor.path().toString(),
854 followerActor.path().toString());
856 leaderActorContext.setPeerAddresses(peerAddresses);
858 leaderActorContext.getReplicatedLog().removeFrom(0);
861 leaderActorContext.setReplicatedLog(
862 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
864 leaderActorContext.setCommitIndex(1);
866 followerActorContext.getReplicatedLog().removeFrom(0);
868 // follower too has the exact same log entries and has the same commit index
869 followerActorContext.setReplicatedLog(
870 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
872 followerActorContext.setCommitIndex(1);
874 Leader leader = new Leader(leaderActorContext);
875 leader.markFollowerActive(followerActor.path().toString());
877 leader.handleMessage(leaderActor, new SendHeartBeat());
879 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
880 .getFirstMatching(followerActor, AppendEntries.class);
882 assertNotNull(appendEntries);
884 assertEquals(1, appendEntries.getLeaderCommit());
885 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
886 assertEquals(0, appendEntries.getPrevLogIndex());
888 AppendEntriesReply appendEntriesReply =
889 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
890 leaderActor, AppendEntriesReply.class);
892 assertNotNull(appendEntriesReply);
894 // follower returns its next index
895 assertEquals(2, appendEntriesReply.getLogLastIndex());
896 assertEquals(1, appendEntriesReply.getLogLastTerm());
903 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
904 new JavaTestKit(getSystem()) {{
906 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
908 MockRaftActorContext leaderActorContext =
909 new MockRaftActorContext("leader", getSystem(), leaderActor);
911 ActorRef followerActor = getSystem().actorOf(
912 Props.create(ForwardMessageToBehaviorActor.class));
914 MockRaftActorContext followerActorContext =
915 new MockRaftActorContext("follower", getSystem(), followerActor);
917 Follower follower = new Follower(followerActorContext);
919 ForwardMessageToBehaviorActor.setBehavior(follower);
921 Map<String, String> peerAddresses = new HashMap<>();
922 peerAddresses.put(followerActor.path().toString(),
923 followerActor.path().toString());
925 leaderActorContext.setPeerAddresses(peerAddresses);
927 leaderActorContext.getReplicatedLog().removeFrom(0);
929 leaderActorContext.setReplicatedLog(
930 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
932 leaderActorContext.setCommitIndex(1);
934 followerActorContext.getReplicatedLog().removeFrom(0);
936 followerActorContext.setReplicatedLog(
937 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
939 // follower has the same log entries but its commit index > leaders commit index
940 followerActorContext.setCommitIndex(2);
942 Leader leader = new Leader(leaderActorContext);
943 leader.markFollowerActive(followerActor.path().toString());
945 leader.handleMessage(leaderActor, new SendHeartBeat());
947 AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
948 .getFirstMatching(followerActor, AppendEntries.class);
950 assertNotNull(appendEntries);
952 assertEquals(1, appendEntries.getLeaderCommit());
953 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
954 assertEquals(0, appendEntries.getPrevLogIndex());
956 AppendEntriesReply appendEntriesReply =
957 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
958 leaderActor, AppendEntriesReply.class);
960 assertNotNull(appendEntriesReply);
962 assertEquals(2, appendEntriesReply.getLogLastIndex());
963 assertEquals(1, appendEntriesReply.getLogLastTerm());
969 public void testHandleAppendEntriesReplyFailure(){
970 new JavaTestKit(getSystem()) {
973 ActorRef leaderActor =
974 getSystem().actorOf(Props.create(MessageCollectorActor.class));
976 ActorRef followerActor =
977 getSystem().actorOf(Props.create(MessageCollectorActor.class));
980 MockRaftActorContext leaderActorContext =
981 new MockRaftActorContext("leader", getSystem(), leaderActor);
983 Map<String, String> peerAddresses = new HashMap<>();
984 peerAddresses.put("follower-1",
985 followerActor.path().toString());
987 leaderActorContext.setPeerAddresses(peerAddresses);
989 Leader leader = new Leader(leaderActorContext);
991 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
993 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
995 assertEquals(RaftState.Leader, raftActorBehavior.state());
1001 public void testHandleAppendEntriesReplySuccess() throws Exception {
1002 new JavaTestKit(getSystem()) {
1005 ActorRef leaderActor =
1006 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1008 ActorRef followerActor =
1009 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1012 MockRaftActorContext leaderActorContext =
1013 new MockRaftActorContext("leader", getSystem(), leaderActor);
1015 leaderActorContext.setReplicatedLog(
1016 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1018 Map<String, String> peerAddresses = new HashMap<>();
1019 peerAddresses.put("follower-1",
1020 followerActor.path().toString());
1022 leaderActorContext.setPeerAddresses(peerAddresses);
1023 leaderActorContext.setCommitIndex(1);
1024 leaderActorContext.setLastApplied(1);
1025 leaderActorContext.getTermInformation().update(1, "leader");
1027 Leader leader = new Leader(leaderActorContext);
1029 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1031 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1033 assertEquals(RaftState.Leader, raftActorBehavior.state());
1035 assertEquals(2, leaderActorContext.getCommitIndex());
1037 ApplyLogEntries applyLogEntries =
1038 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1039 ApplyLogEntries.class);
1041 assertNotNull(applyLogEntries);
1043 assertEquals(2, leaderActorContext.getLastApplied());
1045 assertEquals(2, applyLogEntries.getToIndex());
1047 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1050 assertEquals(1,applyStateList.size());
1052 ApplyState applyState = (ApplyState) applyStateList.get(0);
1054 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1060 public void testHandleAppendEntriesReplyUnknownFollower(){
1061 new JavaTestKit(getSystem()) {
1064 ActorRef leaderActor =
1065 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1067 MockRaftActorContext leaderActorContext =
1068 new MockRaftActorContext("leader", getSystem(), leaderActor);
1070 Leader leader = new Leader(leaderActorContext);
1072 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1074 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1076 assertEquals(RaftState.Leader, raftActorBehavior.state());
1082 public void testHandleRequestVoteReply(){
1083 new JavaTestKit(getSystem()) {
1086 ActorRef leaderActor =
1087 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1089 MockRaftActorContext leaderActorContext =
1090 new MockRaftActorContext("leader", getSystem(), leaderActor);
1092 Leader leader = new Leader(leaderActorContext);
1094 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1096 assertEquals(RaftState.Leader, raftActorBehavior.state());
1098 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1100 assertEquals(RaftState.Leader, raftActorBehavior.state());
1105 public void testIsolatedLeaderCheckNoFollowers() {
1106 new JavaTestKit(getSystem()) {{
1107 ActorRef leaderActor = getTestActor();
1109 MockRaftActorContext leaderActorContext =
1110 new MockRaftActorContext("leader", getSystem(), leaderActor);
1112 Map<String, String> peerAddresses = new HashMap<>();
1113 leaderActorContext.setPeerAddresses(peerAddresses);
1115 Leader leader = new Leader(leaderActorContext);
1116 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1117 Assert.assertTrue(behavior instanceof Leader);
1122 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1123 new JavaTestKit(getSystem()) {{
1125 ActorRef followerActor1 = getTestActor();
1126 ActorRef followerActor2 = getTestActor();
1128 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1130 Map<String, String> peerAddresses = new HashMap<>();
1131 peerAddresses.put("follower-1", followerActor1.path().toString());
1132 peerAddresses.put("follower-2", followerActor2.path().toString());
1134 leaderActorContext.setPeerAddresses(peerAddresses);
1136 Leader leader = new Leader(leaderActorContext);
1137 leader.stopIsolatedLeaderCheckSchedule();
1139 leader.markFollowerActive("follower-1");
1140 leader.markFollowerActive("follower-2");
1141 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1142 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1143 behavior instanceof Leader);
1145 // kill 1 follower and verify if that got killed
1146 final JavaTestKit probe = new JavaTestKit(getSystem());
1147 probe.watch(followerActor1);
1148 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1149 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1150 assertEquals(termMsg1.getActor(), followerActor1);
1152 leader.markFollowerInActive("follower-1");
1153 leader.markFollowerActive("follower-2");
1154 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1155 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1156 behavior instanceof Leader);
1158 // kill 2nd follower and leader should change to Isolated leader
1159 followerActor2.tell(PoisonPill.getInstance(), null);
1160 probe.watch(followerActor2);
1161 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1162 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1163 assertEquals(termMsg2.getActor(), followerActor2);
1165 leader.markFollowerInActive("follower-2");
1166 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1167 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1168 behavior instanceof IsolatedLeader);
1173 class MockLeader extends Leader {
1175 FollowerToSnapshot fts;
1177 public MockLeader(RaftActorContext context){
1181 public FollowerToSnapshot getFollowerToSnapshot() {
1185 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1186 fts = new FollowerToSnapshot(bs);
1187 setFollowerSnapshot(followerId, fts);
1191 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1193 private final long electionTimeOutIntervalMillis;
1194 private final int snapshotChunkSize;
1196 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1198 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1199 this.snapshotChunkSize = snapshotChunkSize;
1203 public FiniteDuration getElectionTimeOutInterval() {
1204 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1208 public int getSnapshotChunkSize() {
1209 return snapshotChunkSize;