1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Assert;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
47 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
48 import scala.concurrent.duration.FiniteDuration;
50 public class LeaderTest extends AbstractRaftActorBehaviorTest {
52 private ActorRef leaderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
54 private ActorRef senderActor =
55 getSystem().actorOf(Props.create(DoNothingActor.class));
58 public void testHandleMessageForUnknownMessage() throws Exception {
59 new JavaTestKit(getSystem()) {{
61 new Leader(createActorContext());
63 // handle message should return the Leader state when it receives an
65 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
66 Assert.assertTrue(behavior instanceof Leader);
71 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
75 protected void run() {
77 ActorRef followerActor = getTestActor();
79 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81 Map<String, String> peerAddresses = new HashMap<>();
83 peerAddresses.put(followerActor.path().toString(),
84 followerActor.path().toString());
86 actorContext.setPeerAddresses(peerAddresses);
88 Leader leader = new Leader(actorContext);
89 leader.handleMessage(senderActor, new SendHeartBeat());
92 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93 // do not put code outside this method, will run afterwards
94 protected String match(Object in) {
95 Object msg = fromSerializableMessage(in);
96 if (msg instanceof AppendEntries) {
97 if (((AppendEntries)msg).getTerm() == 0) {
105 }.get(); // this extracts the received message
107 assertEquals("match", out);
115 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
116 new JavaTestKit(getSystem()) {{
118 new Within(duration("1 seconds")) {
119 protected void run() {
121 ActorRef followerActor = getTestActor();
123 MockRaftActorContext actorContext =
124 (MockRaftActorContext) createActorContext();
126 Map<String, String> peerAddresses = new HashMap<>();
128 peerAddresses.put(followerActor.path().toString(),
129 followerActor.path().toString());
131 actorContext.setPeerAddresses(peerAddresses);
133 Leader leader = new Leader(actorContext);
134 RaftActorBehavior raftBehavior = leader
135 .handleMessage(senderActor, new Replicate(null, null,
136 new MockRaftActorContext.MockReplicatedLogEntry(1,
138 new MockRaftActorContext.MockPayload("foo"))
141 // State should not change
142 assertTrue(raftBehavior instanceof Leader);
145 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
146 // do not put code outside this method, will run afterwards
147 protected String match(Object in) {
148 Object msg = fromSerializableMessage(in);
149 if (msg instanceof AppendEntries) {
150 if (((AppendEntries)msg).getTerm() == 0) {
158 }.get(); // this extracts the received message
160 assertEquals("match", out);
167 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168 new JavaTestKit(getSystem()) {{
170 new Within(duration("1 seconds")) {
171 protected void run() {
173 ActorRef raftActor = getTestActor();
175 MockRaftActorContext actorContext =
176 new MockRaftActorContext("test", getSystem(), raftActor);
178 actorContext.getReplicatedLog().removeFrom(0);
180 actorContext.setReplicatedLog(
181 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
184 Leader leader = new Leader(actorContext);
185 RaftActorBehavior raftBehavior = leader
186 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
188 // State should not change
189 assertTrue(raftBehavior instanceof Leader);
191 assertEquals(1, actorContext.getCommitIndex());
194 new ExpectMsg<String>(duration("1 seconds"),
196 // do not put code outside this method, will run afterwards
197 protected String match(Object in) {
198 if (in instanceof ApplyState) {
199 if (((ApplyState) in).getIdentifier().equals("state-id")) {
207 }.get(); // this extracts the received message
209 assertEquals("match", out);
217 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
218 new JavaTestKit(getSystem()) {{
219 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
221 Map<String, String> peerAddresses = new HashMap<>();
222 peerAddresses.put(followerActor.path().toString(),
223 followerActor.path().toString());
225 MockRaftActorContext actorContext =
226 (MockRaftActorContext) createActorContext(leaderActor);
227 actorContext.setPeerAddresses(peerAddresses);
229 Map<String, String> leadersSnapshot = new HashMap<>();
230 leadersSnapshot.put("1", "A");
231 leadersSnapshot.put("2", "B");
232 leadersSnapshot.put("3", "C");
235 actorContext.getReplicatedLog().removeFrom(0);
237 final int followersLastIndex = 2;
238 final int snapshotIndex = 3;
239 final int newEntryIndex = 4;
240 final int snapshotTerm = 1;
241 final int currentTerm = 2;
243 // set the snapshot variables in replicatedlog
244 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
245 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
246 actorContext.setCommitIndex(followersLastIndex);
247 //set follower timeout to 2 mins, helps during debugging
248 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
250 MockLeader leader = new MockLeader(actorContext);
253 ReplicatedLogImplEntry entry =
254 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
255 new MockRaftActorContext.MockPayload("D"));
257 //update follower timestamp
258 leader.markFollowerActive(followerActor.path().toString());
260 ByteString bs = toByteString(leadersSnapshot);
261 leader.setSnapshot(Optional.of(bs));
262 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
264 //send first chunk and no InstallSnapshotReply received yet
265 leader.getFollowerToSnapshot().getNextChunk();
266 leader.getFollowerToSnapshot().incrementChunkIndex();
268 leader.handleMessage(leaderActor, new SendHeartBeat());
270 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
271 followerActor, AppendEntries.SERIALIZABLE_CLASS);
273 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
274 "received", aeproto);
276 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
278 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
280 //InstallSnapshotReply received
281 leader.getFollowerToSnapshot().markSendStatus(true);
283 leader.handleMessage(senderActor, new SendHeartBeat());
285 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
286 MessageCollectorActor.getFirstMatching(followerActor,
287 InstallSnapshot.SERIALIZABLE_CLASS);
289 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
292 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
294 assertEquals(snapshotIndex, is.getLastIncludedIndex());
300 public void testSendAppendEntriesSnapshotScenario() {
301 new JavaTestKit(getSystem()) {{
303 ActorRef followerActor = getTestActor();
305 Map<String, String> peerAddresses = new HashMap<>();
306 peerAddresses.put(followerActor.path().toString(),
307 followerActor.path().toString());
309 MockRaftActorContext actorContext =
310 (MockRaftActorContext) createActorContext(getRef());
311 actorContext.setPeerAddresses(peerAddresses);
313 Map<String, String> leadersSnapshot = new HashMap<>();
314 leadersSnapshot.put("1", "A");
315 leadersSnapshot.put("2", "B");
316 leadersSnapshot.put("3", "C");
319 actorContext.getReplicatedLog().removeFrom(0);
321 final int followersLastIndex = 2;
322 final int snapshotIndex = 3;
323 final int newEntryIndex = 4;
324 final int snapshotTerm = 1;
325 final int currentTerm = 2;
327 // set the snapshot variables in replicatedlog
328 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
329 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
330 actorContext.setCommitIndex(followersLastIndex);
332 Leader leader = new Leader(actorContext);
335 ReplicatedLogImplEntry entry =
336 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337 new MockRaftActorContext.MockPayload("D"));
339 //update follower timestamp
340 leader.markFollowerActive(followerActor.path().toString());
342 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
343 RaftActorBehavior raftBehavior = leader.handleMessage(
344 senderActor, new Replicate(null, "state-id", entry));
346 assertTrue(raftBehavior instanceof Leader);
348 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
349 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
351 protected Boolean match(Object o) throws Exception {
352 if (o instanceof InitiateInstallSnapshot) {
359 boolean initiateInitiateInstallSnapshot = false;
360 for (Boolean b: matches) {
361 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
364 assertTrue(initiateInitiateInstallSnapshot);
369 public void testInitiateInstallSnapshot() throws Exception {
370 new JavaTestKit(getSystem()) {{
372 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
374 ActorRef followerActor = getTestActor();
376 Map<String, String> peerAddresses = new HashMap<>();
377 peerAddresses.put(followerActor.path().toString(),
378 followerActor.path().toString());
381 MockRaftActorContext actorContext =
382 (MockRaftActorContext) createActorContext(leaderActor);
383 actorContext.setPeerAddresses(peerAddresses);
385 Map<String, String> leadersSnapshot = new HashMap<>();
386 leadersSnapshot.put("1", "A");
387 leadersSnapshot.put("2", "B");
388 leadersSnapshot.put("3", "C");
391 actorContext.getReplicatedLog().removeFrom(0);
393 final int followersLastIndex = 2;
394 final int snapshotIndex = 3;
395 final int newEntryIndex = 4;
396 final int snapshotTerm = 1;
397 final int currentTerm = 2;
399 // set the snapshot variables in replicatedlog
400 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
401 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
402 actorContext.setLastApplied(3);
403 actorContext.setCommitIndex(followersLastIndex);
405 Leader leader = new Leader(actorContext);
406 // set the snapshot as absent and check if capture-snapshot is invoked.
407 leader.setSnapshot(Optional.<ByteString>absent());
410 ReplicatedLogImplEntry entry =
411 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
412 new MockRaftActorContext.MockPayload("D"));
414 actorContext.getReplicatedLog().append(entry);
416 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
417 RaftActorBehavior raftBehavior = leader.handleMessage(
418 leaderActor, new InitiateInstallSnapshot());
420 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
421 getFirstMatching(leaderActor, CaptureSnapshot.class);
425 assertTrue(cs.isInstallSnapshotInitiated());
426 assertEquals(3, cs.getLastAppliedIndex());
427 assertEquals(1, cs.getLastAppliedTerm());
428 assertEquals(4, cs.getLastIndex());
429 assertEquals(2, cs.getLastTerm());
434 public void testInstallSnapshot() {
435 new JavaTestKit(getSystem()) {{
437 ActorRef followerActor = getTestActor();
439 Map<String, String> peerAddresses = new HashMap<>();
440 peerAddresses.put(followerActor.path().toString(),
441 followerActor.path().toString());
443 MockRaftActorContext actorContext =
444 (MockRaftActorContext) createActorContext();
445 actorContext.setPeerAddresses(peerAddresses);
448 Map<String, String> leadersSnapshot = new HashMap<>();
449 leadersSnapshot.put("1", "A");
450 leadersSnapshot.put("2", "B");
451 leadersSnapshot.put("3", "C");
454 actorContext.getReplicatedLog().removeFrom(0);
456 final int followersLastIndex = 2;
457 final int snapshotIndex = 3;
458 final int newEntryIndex = 4;
459 final int snapshotTerm = 1;
460 final int currentTerm = 2;
462 // set the snapshot variables in replicatedlog
463 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
464 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
465 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
466 actorContext.setCommitIndex(followersLastIndex);
468 Leader leader = new Leader(actorContext);
471 ReplicatedLogImplEntry entry =
472 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
473 new MockRaftActorContext.MockPayload("D"));
475 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
476 new SendInstallSnapshot(toByteString(leadersSnapshot)));
478 assertTrue(raftBehavior instanceof Leader);
480 // check if installsnapshot gets called with the correct values.
482 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
483 // do not put code outside this method, will run afterwards
484 protected String match(Object in) {
485 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
486 InstallSnapshot is = (InstallSnapshot)
487 SerializationUtils.fromSerializable(in);
488 if (is.getData() == null) {
489 return "InstallSnapshot data is null";
491 if (is.getLastIncludedIndex() != snapshotIndex) {
492 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
494 if (is.getLastIncludedTerm() != snapshotTerm) {
495 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
497 if (is.getTerm() == currentTerm) {
498 return is.getTerm() + "!=" + currentTerm;
504 return "message mismatch:" + in.getClass();
507 }.get(); // this extracts the received message
509 assertEquals("match", out);
514 public void testHandleInstallSnapshotReplyLastChunk() {
515 new JavaTestKit(getSystem()) {{
517 ActorRef followerActor = getTestActor();
519 Map<String, String> peerAddresses = new HashMap<>();
520 peerAddresses.put(followerActor.path().toString(),
521 followerActor.path().toString());
523 final int followersLastIndex = 2;
524 final int snapshotIndex = 3;
525 final int newEntryIndex = 4;
526 final int snapshotTerm = 1;
527 final int currentTerm = 2;
529 MockRaftActorContext actorContext =
530 (MockRaftActorContext) createActorContext();
531 actorContext.setPeerAddresses(peerAddresses);
532 actorContext.setCommitIndex(followersLastIndex);
534 MockLeader leader = new MockLeader(actorContext);
536 Map<String, String> leadersSnapshot = new HashMap<>();
537 leadersSnapshot.put("1", "A");
538 leadersSnapshot.put("2", "B");
539 leadersSnapshot.put("3", "C");
541 // set the snapshot variables in replicatedlog
543 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
544 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
545 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
547 ByteString bs = toByteString(leadersSnapshot);
548 leader.setSnapshot(Optional.of(bs));
549 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
550 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
551 leader.getFollowerToSnapshot().getNextChunk();
552 leader.getFollowerToSnapshot().incrementChunkIndex();
556 actorContext.getReplicatedLog().removeFrom(0);
558 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
559 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
560 leader.getFollowerToSnapshot().getChunkIndex(), true));
562 assertTrue(raftBehavior instanceof Leader);
564 assertEquals(0, leader.followerSnapshotSize());
565 assertEquals(1, leader.followerLogSize());
566 assertNotNull(leader.getFollower(followerActor.path().toString()));
567 FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
568 assertEquals(snapshotIndex, fli.getMatchIndex().get());
569 assertEquals(snapshotIndex, fli.getMatchIndex().get());
570 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
575 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
576 new JavaTestKit(getSystem()) {{
578 TestActorRef<MessageCollectorActor> followerActor =
579 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
581 Map<String, String> peerAddresses = new HashMap<>();
582 peerAddresses.put(followerActor.path().toString(),
583 followerActor.path().toString());
585 final int followersLastIndex = 2;
586 final int snapshotIndex = 3;
587 final int snapshotTerm = 1;
588 final int currentTerm = 2;
590 MockRaftActorContext actorContext =
591 (MockRaftActorContext) createActorContext();
593 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
595 public int getSnapshotChunkSize() {
599 actorContext.setPeerAddresses(peerAddresses);
600 actorContext.setCommitIndex(followersLastIndex);
602 MockLeader leader = new MockLeader(actorContext);
604 Map<String, String> leadersSnapshot = new HashMap<>();
605 leadersSnapshot.put("1", "A");
606 leadersSnapshot.put("2", "B");
607 leadersSnapshot.put("3", "C");
609 // set the snapshot variables in replicatedlog
610 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
611 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
612 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
614 ByteString bs = toByteString(leadersSnapshot);
615 leader.setSnapshot(Optional.of(bs));
617 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
619 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
621 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
623 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
625 assertEquals(1, installSnapshot.getChunkIndex());
626 assertEquals(3, installSnapshot.getTotalChunks());
629 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
631 leader.handleMessage(leaderActor, new SendHeartBeat());
633 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
635 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
637 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
639 assertEquals(1, installSnapshot.getChunkIndex());
640 assertEquals(3, installSnapshot.getTotalChunks());
642 followerActor.tell(PoisonPill.getInstance(), getRef());
647 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
648 new JavaTestKit(getSystem()) {
651 TestActorRef<MessageCollectorActor> followerActor =
652 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
654 Map<String, String> peerAddresses = new HashMap<>();
655 peerAddresses.put(followerActor.path().toString(),
656 followerActor.path().toString());
658 final int followersLastIndex = 2;
659 final int snapshotIndex = 3;
660 final int snapshotTerm = 1;
661 final int currentTerm = 2;
663 MockRaftActorContext actorContext =
664 (MockRaftActorContext) createActorContext();
666 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
668 public int getSnapshotChunkSize() {
672 actorContext.setPeerAddresses(peerAddresses);
673 actorContext.setCommitIndex(followersLastIndex);
675 MockLeader leader = new MockLeader(actorContext);
677 Map<String, String> leadersSnapshot = new HashMap<>();
678 leadersSnapshot.put("1", "A");
679 leadersSnapshot.put("2", "B");
680 leadersSnapshot.put("3", "C");
682 // set the snapshot variables in replicatedlog
683 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
684 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
685 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
687 ByteString bs = toByteString(leadersSnapshot);
688 leader.setSnapshot(Optional.of(bs));
690 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
692 Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
694 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
696 InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
698 assertEquals(1, installSnapshot.getChunkIndex());
699 assertEquals(3, installSnapshot.getTotalChunks());
700 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
702 int hashCode = installSnapshot.getData().hashCode();
704 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
706 leader.handleMessage(leaderActor, new SendHeartBeat());
708 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
710 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
712 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
714 installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
716 assertEquals(2, installSnapshot.getChunkIndex());
717 assertEquals(3, installSnapshot.getTotalChunks());
718 assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
720 followerActor.tell(PoisonPill.getInstance(), getRef());
725 public void testFollowerToSnapshotLogic() {
727 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
729 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
731 public int getSnapshotChunkSize() {
736 MockLeader leader = new MockLeader(actorContext);
738 Map<String, String> leadersSnapshot = new HashMap<>();
739 leadersSnapshot.put("1", "A");
740 leadersSnapshot.put("2", "B");
741 leadersSnapshot.put("3", "C");
743 ByteString bs = toByteString(leadersSnapshot);
744 byte[] barray = bs.toByteArray();
746 leader.createFollowerToSnapshot("followerId", bs);
747 assertEquals(bs.size(), barray.length);
750 for (int i=0; i < barray.length; i = i + 50) {
754 if (i + 50 > barray.length) {
758 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
759 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
760 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
762 leader.getFollowerToSnapshot().markSendStatus(true);
763 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
764 leader.getFollowerToSnapshot().incrementChunkIndex();
768 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
772 @Override protected RaftActorBehavior createBehavior(
773 RaftActorContext actorContext) {
774 return new Leader(actorContext);
777 @Override protected RaftActorContext createActorContext() {
778 return createActorContext(leaderActor);
781 protected RaftActorContext createActorContext(ActorRef actorRef) {
782 return new MockRaftActorContext("test", getSystem(), actorRef);
785 private ByteString toByteString(Map<String, String> state) {
786 ByteArrayOutputStream b = null;
787 ObjectOutputStream o = null;
790 b = new ByteArrayOutputStream();
791 o = new ObjectOutputStream(b);
792 o.writeObject(state);
793 byte[] snapshotBytes = b.toByteArray();
794 return ByteString.copyFrom(snapshotBytes);
804 } catch (IOException e) {
805 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
810 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
811 private static AbstractRaftActorBehavior behavior;
813 public ForwardMessageToBehaviorActor(){
817 @Override public void onReceive(Object message) throws Exception {
818 super.onReceive(message);
819 behavior.handleMessage(sender(), message);
822 public static void setBehavior(AbstractRaftActorBehavior behavior){
823 ForwardMessageToBehaviorActor.behavior = behavior;
828 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
829 new JavaTestKit(getSystem()) {{
831 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
833 MockRaftActorContext leaderActorContext =
834 new MockRaftActorContext("leader", getSystem(), leaderActor);
836 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
838 MockRaftActorContext followerActorContext =
839 new MockRaftActorContext("follower", getSystem(), followerActor);
841 Follower follower = new Follower(followerActorContext);
843 ForwardMessageToBehaviorActor.setBehavior(follower);
845 Map<String, String> peerAddresses = new HashMap<>();
846 peerAddresses.put(followerActor.path().toString(),
847 followerActor.path().toString());
849 leaderActorContext.setPeerAddresses(peerAddresses);
851 leaderActorContext.getReplicatedLog().removeFrom(0);
854 leaderActorContext.setReplicatedLog(
855 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
857 leaderActorContext.setCommitIndex(1);
859 followerActorContext.getReplicatedLog().removeFrom(0);
861 // follower too has the exact same log entries and has the same commit index
862 followerActorContext.setReplicatedLog(
863 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
865 followerActorContext.setCommitIndex(1);
867 Leader leader = new Leader(leaderActorContext);
868 leader.markFollowerActive(followerActor.path().toString());
870 leader.handleMessage(leaderActor, new SendHeartBeat());
872 AppendEntriesMessages.AppendEntries appendEntries =
873 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
874 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
876 assertNotNull(appendEntries);
878 assertEquals(1, appendEntries.getLeaderCommit());
879 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
880 assertEquals(0, appendEntries.getPrevLogIndex());
882 AppendEntriesReply appendEntriesReply =
883 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
884 leaderActor, AppendEntriesReply.class);
886 assertNotNull(appendEntriesReply);
888 // follower returns its next index
889 assertEquals(2, appendEntriesReply.getLogLastIndex());
890 assertEquals(1, appendEntriesReply.getLogLastTerm());
897 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
898 new JavaTestKit(getSystem()) {{
900 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
902 MockRaftActorContext leaderActorContext =
903 new MockRaftActorContext("leader", getSystem(), leaderActor);
905 ActorRef followerActor = getSystem().actorOf(
906 Props.create(ForwardMessageToBehaviorActor.class));
908 MockRaftActorContext followerActorContext =
909 new MockRaftActorContext("follower", getSystem(), followerActor);
911 Follower follower = new Follower(followerActorContext);
913 ForwardMessageToBehaviorActor.setBehavior(follower);
915 Map<String, String> peerAddresses = new HashMap<>();
916 peerAddresses.put(followerActor.path().toString(),
917 followerActor.path().toString());
919 leaderActorContext.setPeerAddresses(peerAddresses);
921 leaderActorContext.getReplicatedLog().removeFrom(0);
923 leaderActorContext.setReplicatedLog(
924 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
926 leaderActorContext.setCommitIndex(1);
928 followerActorContext.getReplicatedLog().removeFrom(0);
930 followerActorContext.setReplicatedLog(
931 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
933 // follower has the same log entries but its commit index > leaders commit index
934 followerActorContext.setCommitIndex(2);
936 Leader leader = new Leader(leaderActorContext);
937 leader.markFollowerActive(followerActor.path().toString());
939 leader.handleMessage(leaderActor, new SendHeartBeat());
941 AppendEntriesMessages.AppendEntries appendEntries =
942 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
943 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
945 assertNotNull(appendEntries);
947 assertEquals(1, appendEntries.getLeaderCommit());
948 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
949 assertEquals(0, appendEntries.getPrevLogIndex());
951 AppendEntriesReply appendEntriesReply =
952 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
953 leaderActor, AppendEntriesReply.class);
955 assertNotNull(appendEntriesReply);
957 assertEquals(2, appendEntriesReply.getLogLastIndex());
958 assertEquals(1, appendEntriesReply.getLogLastTerm());
964 public void testHandleAppendEntriesReplyFailure(){
965 new JavaTestKit(getSystem()) {
968 ActorRef leaderActor =
969 getSystem().actorOf(Props.create(MessageCollectorActor.class));
971 ActorRef followerActor =
972 getSystem().actorOf(Props.create(MessageCollectorActor.class));
975 MockRaftActorContext leaderActorContext =
976 new MockRaftActorContext("leader", getSystem(), leaderActor);
978 Map<String, String> peerAddresses = new HashMap<>();
979 peerAddresses.put("follower-1",
980 followerActor.path().toString());
982 leaderActorContext.setPeerAddresses(peerAddresses);
984 Leader leader = new Leader(leaderActorContext);
986 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
988 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
990 assertEquals(RaftState.Leader, raftActorBehavior.state());
996 public void testHandleAppendEntriesReplySuccess() throws Exception {
997 new JavaTestKit(getSystem()) {
1000 ActorRef leaderActor =
1001 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1003 ActorRef followerActor =
1004 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1007 MockRaftActorContext leaderActorContext =
1008 new MockRaftActorContext("leader", getSystem(), leaderActor);
1010 leaderActorContext.setReplicatedLog(
1011 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1013 Map<String, String> peerAddresses = new HashMap<>();
1014 peerAddresses.put("follower-1",
1015 followerActor.path().toString());
1017 leaderActorContext.setPeerAddresses(peerAddresses);
1018 leaderActorContext.setCommitIndex(1);
1019 leaderActorContext.setLastApplied(1);
1020 leaderActorContext.getTermInformation().update(1, "leader");
1022 Leader leader = new Leader(leaderActorContext);
1024 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
1026 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1028 assertEquals(RaftState.Leader, raftActorBehavior.state());
1030 assertEquals(2, leaderActorContext.getCommitIndex());
1032 ApplyLogEntries applyLogEntries =
1033 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
1034 ApplyLogEntries.class);
1036 assertNotNull(applyLogEntries);
1038 assertEquals(2, leaderActorContext.getLastApplied());
1040 assertEquals(2, applyLogEntries.getToIndex());
1042 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1045 assertEquals(1,applyStateList.size());
1047 ApplyState applyState = (ApplyState) applyStateList.get(0);
1049 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1055 public void testHandleAppendEntriesReplyUnknownFollower(){
1056 new JavaTestKit(getSystem()) {
1059 ActorRef leaderActor =
1060 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1062 MockRaftActorContext leaderActorContext =
1063 new MockRaftActorContext("leader", getSystem(), leaderActor);
1065 Leader leader = new Leader(leaderActorContext);
1067 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
1069 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
1071 assertEquals(RaftState.Leader, raftActorBehavior.state());
1077 public void testHandleRequestVoteReply(){
1078 new JavaTestKit(getSystem()) {
1081 ActorRef leaderActor =
1082 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1084 MockRaftActorContext leaderActorContext =
1085 new MockRaftActorContext("leader", getSystem(), leaderActor);
1087 Leader leader = new Leader(leaderActorContext);
1089 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
1091 assertEquals(RaftState.Leader, raftActorBehavior.state());
1093 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
1095 assertEquals(RaftState.Leader, raftActorBehavior.state());
1100 public void testIsolatedLeaderCheckNoFollowers() {
1101 new JavaTestKit(getSystem()) {{
1102 ActorRef leaderActor = getTestActor();
1104 MockRaftActorContext leaderActorContext =
1105 new MockRaftActorContext("leader", getSystem(), leaderActor);
1107 Map<String, String> peerAddresses = new HashMap<>();
1108 leaderActorContext.setPeerAddresses(peerAddresses);
1110 Leader leader = new Leader(leaderActorContext);
1111 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1112 Assert.assertTrue(behavior instanceof Leader);
1117 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1118 new JavaTestKit(getSystem()) {{
1120 ActorRef followerActor1 = getTestActor();
1121 ActorRef followerActor2 = getTestActor();
1123 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
1125 Map<String, String> peerAddresses = new HashMap<>();
1126 peerAddresses.put("follower-1", followerActor1.path().toString());
1127 peerAddresses.put("follower-2", followerActor2.path().toString());
1129 leaderActorContext.setPeerAddresses(peerAddresses);
1131 Leader leader = new Leader(leaderActorContext);
1132 leader.stopIsolatedLeaderCheckSchedule();
1134 leader.markFollowerActive("follower-1");
1135 leader.markFollowerActive("follower-2");
1136 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1137 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1138 behavior instanceof Leader);
1140 // kill 1 follower and verify if that got killed
1141 final JavaTestKit probe = new JavaTestKit(getSystem());
1142 probe.watch(followerActor1);
1143 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1144 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1145 assertEquals(termMsg1.getActor(), followerActor1);
1147 leader.markFollowerInActive("follower-1");
1148 leader.markFollowerActive("follower-2");
1149 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1150 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1151 behavior instanceof Leader);
1153 // kill 2nd follower and leader should change to Isolated leader
1154 followerActor2.tell(PoisonPill.getInstance(), null);
1155 probe.watch(followerActor2);
1156 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1157 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1158 assertEquals(termMsg2.getActor(), followerActor2);
1160 leader.markFollowerInActive("follower-2");
1161 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1162 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1163 behavior instanceof IsolatedLeader);
1168 class MockLeader extends Leader {
1170 FollowerToSnapshot fts;
1172 public MockLeader(RaftActorContext context){
1176 public FollowerToSnapshot getFollowerToSnapshot() {
1180 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1181 fts = new FollowerToSnapshot(bs);
1182 setFollowerSnapshot(followerId, fts);
1186 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1188 private long electionTimeOutIntervalMillis;
1189 private int snapshotChunkSize;
1191 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1193 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1194 this.snapshotChunkSize = snapshotChunkSize;
1198 public FiniteDuration getElectionTimeOutInterval() {
1199 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1203 public int getSnapshotChunkSize() {
1204 return snapshotChunkSize;