1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import com.google.common.base.Optional;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.HashMap;
14 import java.util.List;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Assert;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
21 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
25 import org.opendaylight.controller.cluster.raft.SerializationUtils;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
28 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
31 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
32 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
33 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
42 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
43 import scala.concurrent.duration.FiniteDuration;
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertNotNull;
46 import static org.junit.Assert.assertTrue;
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
50 private ActorRef leaderActor =
51 getSystem().actorOf(Props.create(DoNothingActor.class));
52 private ActorRef senderActor =
53 getSystem().actorOf(Props.create(DoNothingActor.class));
56 public void testHandleMessageForUnknownMessage() throws Exception {
57 new JavaTestKit(getSystem()) {{
59 new Leader(createActorContext());
61 // handle message should return the Leader state when it receives an
63 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
64 Assert.assertTrue(behavior instanceof Leader);
70 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
71 new JavaTestKit(getSystem()) {{
73 new Within(duration("1 seconds")) {
74 protected void run() {
76 ActorRef followerActor = getTestActor();
78 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
80 Map<String, String> peerAddresses = new HashMap<>();
82 peerAddresses.put(followerActor.path().toString(),
83 followerActor.path().toString());
85 actorContext.setPeerAddresses(peerAddresses);
87 Leader leader = new Leader(actorContext);
88 leader.handleMessage(senderActor, new SendHeartBeat());
91 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
92 // do not put code outside this method, will run afterwards
93 protected String match(Object in) {
94 Object msg = fromSerializableMessage(in);
95 if (msg instanceof AppendEntries) {
96 if (((AppendEntries)msg).getTerm() == 0) {
104 }.get(); // this extracts the received message
106 assertEquals("match", out);
114 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
115 new JavaTestKit(getSystem()) {{
117 new Within(duration("1 seconds")) {
118 protected void run() {
120 ActorRef followerActor = getTestActor();
122 MockRaftActorContext actorContext =
123 (MockRaftActorContext) createActorContext();
125 Map<String, String> peerAddresses = new HashMap<>();
127 peerAddresses.put(followerActor.path().toString(),
128 followerActor.path().toString());
130 actorContext.setPeerAddresses(peerAddresses);
132 Leader leader = new Leader(actorContext);
133 RaftActorBehavior raftBehavior = leader
134 .handleMessage(senderActor, new Replicate(null, null,
135 new MockRaftActorContext.MockReplicatedLogEntry(1,
137 new MockRaftActorContext.MockPayload("foo"))
140 // State should not change
141 assertTrue(raftBehavior instanceof Leader);
144 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
145 // do not put code outside this method, will run afterwards
146 protected String match(Object in) {
147 Object msg = fromSerializableMessage(in);
148 if (msg instanceof AppendEntries) {
149 if (((AppendEntries)msg).getTerm() == 0) {
157 }.get(); // this extracts the received message
159 assertEquals("match", out);
166 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
167 new JavaTestKit(getSystem()) {{
169 new Within(duration("1 seconds")) {
170 protected void run() {
172 ActorRef raftActor = getTestActor();
174 MockRaftActorContext actorContext =
175 new MockRaftActorContext("test", getSystem(), raftActor);
177 actorContext.getReplicatedLog().removeFrom(0);
179 actorContext.setReplicatedLog(
180 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
183 Leader leader = new Leader(actorContext);
184 RaftActorBehavior raftBehavior = leader
185 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
187 // State should not change
188 assertTrue(raftBehavior instanceof Leader);
190 assertEquals(1, actorContext.getCommitIndex());
193 new ExpectMsg<String>(duration("1 seconds"),
195 // do not put code outside this method, will run afterwards
196 protected String match(Object in) {
197 if (in instanceof ApplyState) {
198 if (((ApplyState) in).getIdentifier().equals("state-id")) {
206 }.get(); // this extracts the received message
208 assertEquals("match", out);
216 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
217 new JavaTestKit(getSystem()) {{
218 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
220 Map<String, String> peerAddresses = new HashMap<>();
221 peerAddresses.put(followerActor.path().toString(),
222 followerActor.path().toString());
224 MockRaftActorContext actorContext =
225 (MockRaftActorContext) createActorContext(leaderActor);
226 actorContext.setPeerAddresses(peerAddresses);
228 Map<String, String> leadersSnapshot = new HashMap<>();
229 leadersSnapshot.put("1", "A");
230 leadersSnapshot.put("2", "B");
231 leadersSnapshot.put("3", "C");
234 actorContext.getReplicatedLog().removeFrom(0);
236 final int followersLastIndex = 2;
237 final int snapshotIndex = 3;
238 final int newEntryIndex = 4;
239 final int snapshotTerm = 1;
240 final int currentTerm = 2;
242 // set the snapshot variables in replicatedlog
243 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
244 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
245 actorContext.setCommitIndex(followersLastIndex);
246 //set follower timeout to 2 mins, helps during debugging
247 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
249 MockLeader leader = new MockLeader(actorContext);
252 ReplicatedLogImplEntry entry =
253 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
254 new MockRaftActorContext.MockPayload("D"));
256 //update follower timestamp
257 leader.markFollowerActive(followerActor.path().toString());
259 ByteString bs = toByteString(leadersSnapshot);
260 leader.setSnapshot(Optional.of(bs));
261 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
263 //send first chunk and no InstallSnapshotReply received yet
264 leader.getFollowerToSnapshot().getNextChunk();
265 leader.getFollowerToSnapshot().incrementChunkIndex();
267 leader.handleMessage(leaderActor, new SendHeartBeat());
269 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
270 followerActor, AppendEntries.SERIALIZABLE_CLASS);
272 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
273 "received", aeproto);
275 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
277 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
279 //InstallSnapshotReply received
280 leader.getFollowerToSnapshot().markSendStatus(true);
282 leader.handleMessage(senderActor, new SendHeartBeat());
284 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
285 MessageCollectorActor.getFirstMatching(followerActor,
286 InstallSnapshot.SERIALIZABLE_CLASS);
288 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
291 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
293 assertEquals(snapshotIndex, is.getLastIncludedIndex());
299 public void testSendAppendEntriesSnapshotScenario() {
300 new JavaTestKit(getSystem()) {{
302 ActorRef followerActor = getTestActor();
304 Map<String, String> peerAddresses = new HashMap<>();
305 peerAddresses.put(followerActor.path().toString(),
306 followerActor.path().toString());
308 MockRaftActorContext actorContext =
309 (MockRaftActorContext) createActorContext(getRef());
310 actorContext.setPeerAddresses(peerAddresses);
312 Map<String, String> leadersSnapshot = new HashMap<>();
313 leadersSnapshot.put("1", "A");
314 leadersSnapshot.put("2", "B");
315 leadersSnapshot.put("3", "C");
318 actorContext.getReplicatedLog().removeFrom(0);
320 final int followersLastIndex = 2;
321 final int snapshotIndex = 3;
322 final int newEntryIndex = 4;
323 final int snapshotTerm = 1;
324 final int currentTerm = 2;
326 // set the snapshot variables in replicatedlog
327 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
328 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
329 actorContext.setCommitIndex(followersLastIndex);
331 Leader leader = new Leader(actorContext);
334 ReplicatedLogImplEntry entry =
335 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
336 new MockRaftActorContext.MockPayload("D"));
338 //update follower timestamp
339 leader.markFollowerActive(followerActor.path().toString());
341 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
342 RaftActorBehavior raftBehavior = leader.handleMessage(
343 senderActor, new Replicate(null, "state-id", entry));
345 assertTrue(raftBehavior instanceof Leader);
347 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
348 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
350 protected Boolean match(Object o) throws Exception {
351 if (o instanceof InitiateInstallSnapshot) {
358 boolean initiateInitiateInstallSnapshot = false;
359 for (Boolean b: matches) {
360 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
363 assertTrue(initiateInitiateInstallSnapshot);
368 public void testInitiateInstallSnapshot() throws Exception {
369 new JavaTestKit(getSystem()) {{
371 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
373 ActorRef followerActor = getTestActor();
375 Map<String, String> peerAddresses = new HashMap<>();
376 peerAddresses.put(followerActor.path().toString(),
377 followerActor.path().toString());
380 MockRaftActorContext actorContext =
381 (MockRaftActorContext) createActorContext(leaderActor);
382 actorContext.setPeerAddresses(peerAddresses);
384 Map<String, String> leadersSnapshot = new HashMap<>();
385 leadersSnapshot.put("1", "A");
386 leadersSnapshot.put("2", "B");
387 leadersSnapshot.put("3", "C");
390 actorContext.getReplicatedLog().removeFrom(0);
392 final int followersLastIndex = 2;
393 final int snapshotIndex = 3;
394 final int newEntryIndex = 4;
395 final int snapshotTerm = 1;
396 final int currentTerm = 2;
398 // set the snapshot variables in replicatedlog
399 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
400 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
401 actorContext.setLastApplied(3);
402 actorContext.setCommitIndex(followersLastIndex);
404 Leader leader = new Leader(actorContext);
405 // set the snapshot as absent and check if capture-snapshot is invoked.
406 leader.setSnapshot(Optional.<ByteString>absent());
409 ReplicatedLogImplEntry entry =
410 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
411 new MockRaftActorContext.MockPayload("D"));
413 actorContext.getReplicatedLog().append(entry);
415 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
416 RaftActorBehavior raftBehavior = leader.handleMessage(
417 leaderActor, new InitiateInstallSnapshot());
419 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
420 getFirstMatching(leaderActor, CaptureSnapshot.class);
424 assertTrue(cs.isInstallSnapshotInitiated());
425 assertEquals(3, cs.getLastAppliedIndex());
426 assertEquals(1, cs.getLastAppliedTerm());
427 assertEquals(4, cs.getLastIndex());
428 assertEquals(2, cs.getLastTerm());
433 public void testInstallSnapshot() {
434 new JavaTestKit(getSystem()) {{
436 ActorRef followerActor = getTestActor();
438 Map<String, String> peerAddresses = new HashMap<>();
439 peerAddresses.put(followerActor.path().toString(),
440 followerActor.path().toString());
442 MockRaftActorContext actorContext =
443 (MockRaftActorContext) createActorContext();
444 actorContext.setPeerAddresses(peerAddresses);
447 Map<String, String> leadersSnapshot = new HashMap<>();
448 leadersSnapshot.put("1", "A");
449 leadersSnapshot.put("2", "B");
450 leadersSnapshot.put("3", "C");
453 actorContext.getReplicatedLog().removeFrom(0);
455 final int followersLastIndex = 2;
456 final int snapshotIndex = 3;
457 final int newEntryIndex = 4;
458 final int snapshotTerm = 1;
459 final int currentTerm = 2;
461 // set the snapshot variables in replicatedlog
462 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
463 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
464 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
465 actorContext.setCommitIndex(followersLastIndex);
467 Leader leader = new Leader(actorContext);
470 ReplicatedLogImplEntry entry =
471 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
472 new MockRaftActorContext.MockPayload("D"));
474 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
475 new SendInstallSnapshot(toByteString(leadersSnapshot)));
477 assertTrue(raftBehavior instanceof Leader);
479 // check if installsnapshot gets called with the correct values.
481 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
482 // do not put code outside this method, will run afterwards
483 protected String match(Object in) {
484 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
485 InstallSnapshot is = (InstallSnapshot)
486 SerializationUtils.fromSerializable(in);
487 if (is.getData() == null) {
488 return "InstallSnapshot data is null";
490 if (is.getLastIncludedIndex() != snapshotIndex) {
491 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
493 if (is.getLastIncludedTerm() != snapshotTerm) {
494 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
496 if (is.getTerm() == currentTerm) {
497 return is.getTerm() + "!=" + currentTerm;
503 return "message mismatch:" + in.getClass();
506 }.get(); // this extracts the received message
508 assertEquals("match", out);
513 public void testHandleInstallSnapshotReplyLastChunk() {
514 new JavaTestKit(getSystem()) {{
516 ActorRef followerActor = getTestActor();
518 Map<String, String> peerAddresses = new HashMap<>();
519 peerAddresses.put(followerActor.path().toString(),
520 followerActor.path().toString());
522 final int followersLastIndex = 2;
523 final int snapshotIndex = 3;
524 final int newEntryIndex = 4;
525 final int snapshotTerm = 1;
526 final int currentTerm = 2;
528 MockRaftActorContext actorContext =
529 (MockRaftActorContext) createActorContext();
530 actorContext.setPeerAddresses(peerAddresses);
531 actorContext.setCommitIndex(followersLastIndex);
533 MockLeader leader = new MockLeader(actorContext);
535 Map<String, String> leadersSnapshot = new HashMap<>();
536 leadersSnapshot.put("1", "A");
537 leadersSnapshot.put("2", "B");
538 leadersSnapshot.put("3", "C");
540 // set the snapshot variables in replicatedlog
542 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
543 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
544 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
546 ByteString bs = toByteString(leadersSnapshot);
547 leader.setSnapshot(Optional.of(bs));
548 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
549 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
550 leader.getFollowerToSnapshot().getNextChunk();
551 leader.getFollowerToSnapshot().incrementChunkIndex();
555 actorContext.getReplicatedLog().removeFrom(0);
557 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
558 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
559 leader.getFollowerToSnapshot().getChunkIndex(), true));
561 assertTrue(raftBehavior instanceof Leader);
563 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
564 assertEquals(leader.followerToLog.size(), 1);
565 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
566 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
567 assertEquals(snapshotIndex, fli.getMatchIndex().get());
568 assertEquals(snapshotIndex, fli.getMatchIndex().get());
569 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
574 public void testFollowerToSnapshotLogic() {
576 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
578 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
580 public int getSnapshotChunkSize() {
585 MockLeader leader = new MockLeader(actorContext);
587 Map<String, String> leadersSnapshot = new HashMap<>();
588 leadersSnapshot.put("1", "A");
589 leadersSnapshot.put("2", "B");
590 leadersSnapshot.put("3", "C");
592 ByteString bs = toByteString(leadersSnapshot);
593 byte[] barray = bs.toByteArray();
595 leader.createFollowerToSnapshot("followerId", bs);
596 assertEquals(bs.size(), barray.length);
599 for (int i=0; i < barray.length; i = i + 50) {
603 if (i + 50 > barray.length) {
607 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
608 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
609 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
611 leader.getFollowerToSnapshot().markSendStatus(true);
612 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
613 leader.getFollowerToSnapshot().incrementChunkIndex();
617 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
621 @Override protected RaftActorBehavior createBehavior(
622 RaftActorContext actorContext) {
623 return new Leader(actorContext);
626 @Override protected RaftActorContext createActorContext() {
627 return createActorContext(leaderActor);
630 protected RaftActorContext createActorContext(ActorRef actorRef) {
631 return new MockRaftActorContext("test", getSystem(), actorRef);
634 private ByteString toByteString(Map<String, String> state) {
635 ByteArrayOutputStream b = null;
636 ObjectOutputStream o = null;
639 b = new ByteArrayOutputStream();
640 o = new ObjectOutputStream(b);
641 o.writeObject(state);
642 byte[] snapshotBytes = b.toByteArray();
643 return ByteString.copyFrom(snapshotBytes);
653 } catch (IOException e) {
654 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
659 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
660 private static AbstractRaftActorBehavior behavior;
662 public ForwardMessageToBehaviorActor(){
666 @Override public void onReceive(Object message) throws Exception {
667 super.onReceive(message);
668 behavior.handleMessage(sender(), message);
671 public static void setBehavior(AbstractRaftActorBehavior behavior){
672 ForwardMessageToBehaviorActor.behavior = behavior;
677 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
678 new JavaTestKit(getSystem()) {{
680 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
682 MockRaftActorContext leaderActorContext =
683 new MockRaftActorContext("leader", getSystem(), leaderActor);
685 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
687 MockRaftActorContext followerActorContext =
688 new MockRaftActorContext("follower", getSystem(), followerActor);
690 Follower follower = new Follower(followerActorContext);
692 ForwardMessageToBehaviorActor.setBehavior(follower);
694 Map<String, String> peerAddresses = new HashMap<>();
695 peerAddresses.put(followerActor.path().toString(),
696 followerActor.path().toString());
698 leaderActorContext.setPeerAddresses(peerAddresses);
700 leaderActorContext.getReplicatedLog().removeFrom(0);
703 leaderActorContext.setReplicatedLog(
704 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
706 leaderActorContext.setCommitIndex(1);
708 followerActorContext.getReplicatedLog().removeFrom(0);
710 // follower too has the exact same log entries and has the same commit index
711 followerActorContext.setReplicatedLog(
712 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
714 followerActorContext.setCommitIndex(1);
716 Leader leader = new Leader(leaderActorContext);
717 leader.markFollowerActive(followerActor.path().toString());
719 leader.handleMessage(leaderActor, new SendHeartBeat());
721 AppendEntriesMessages.AppendEntries appendEntries =
722 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
723 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
725 assertNotNull(appendEntries);
727 assertEquals(1, appendEntries.getLeaderCommit());
728 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
729 assertEquals(0, appendEntries.getPrevLogIndex());
731 AppendEntriesReply appendEntriesReply =
732 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
733 leaderActor, AppendEntriesReply.class);
735 assertNotNull(appendEntriesReply);
737 // follower returns its next index
738 assertEquals(2, appendEntriesReply.getLogLastIndex());
739 assertEquals(1, appendEntriesReply.getLogLastTerm());
746 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
747 new JavaTestKit(getSystem()) {{
749 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
751 MockRaftActorContext leaderActorContext =
752 new MockRaftActorContext("leader", getSystem(), leaderActor);
754 ActorRef followerActor = getSystem().actorOf(
755 Props.create(ForwardMessageToBehaviorActor.class));
757 MockRaftActorContext followerActorContext =
758 new MockRaftActorContext("follower", getSystem(), followerActor);
760 Follower follower = new Follower(followerActorContext);
762 ForwardMessageToBehaviorActor.setBehavior(follower);
764 Map<String, String> peerAddresses = new HashMap<>();
765 peerAddresses.put(followerActor.path().toString(),
766 followerActor.path().toString());
768 leaderActorContext.setPeerAddresses(peerAddresses);
770 leaderActorContext.getReplicatedLog().removeFrom(0);
772 leaderActorContext.setReplicatedLog(
773 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
775 leaderActorContext.setCommitIndex(1);
777 followerActorContext.getReplicatedLog().removeFrom(0);
779 followerActorContext.setReplicatedLog(
780 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
782 // follower has the same log entries but its commit index > leaders commit index
783 followerActorContext.setCommitIndex(2);
785 Leader leader = new Leader(leaderActorContext);
786 leader.markFollowerActive(followerActor.path().toString());
788 leader.handleMessage(leaderActor, new SendHeartBeat());
790 AppendEntriesMessages.AppendEntries appendEntries =
791 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
792 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
794 assertNotNull(appendEntries);
796 assertEquals(1, appendEntries.getLeaderCommit());
797 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
798 assertEquals(0, appendEntries.getPrevLogIndex());
800 AppendEntriesReply appendEntriesReply =
801 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
802 leaderActor, AppendEntriesReply.class);
804 assertNotNull(appendEntriesReply);
806 assertEquals(2, appendEntriesReply.getLogLastIndex());
807 assertEquals(1, appendEntriesReply.getLogLastTerm());
813 public void testHandleAppendEntriesReplyFailure(){
814 new JavaTestKit(getSystem()) {
817 ActorRef leaderActor =
818 getSystem().actorOf(Props.create(MessageCollectorActor.class));
820 ActorRef followerActor =
821 getSystem().actorOf(Props.create(MessageCollectorActor.class));
824 MockRaftActorContext leaderActorContext =
825 new MockRaftActorContext("leader", getSystem(), leaderActor);
827 Map<String, String> peerAddresses = new HashMap<>();
828 peerAddresses.put("follower-1",
829 followerActor.path().toString());
831 leaderActorContext.setPeerAddresses(peerAddresses);
833 Leader leader = new Leader(leaderActorContext);
835 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
837 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
839 assertEquals(RaftState.Leader, raftActorBehavior.state());
845 public void testHandleAppendEntriesReplySuccess() throws Exception {
846 new JavaTestKit(getSystem()) {
849 ActorRef leaderActor =
850 getSystem().actorOf(Props.create(MessageCollectorActor.class));
852 ActorRef followerActor =
853 getSystem().actorOf(Props.create(MessageCollectorActor.class));
856 MockRaftActorContext leaderActorContext =
857 new MockRaftActorContext("leader", getSystem(), leaderActor);
859 leaderActorContext.setReplicatedLog(
860 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
862 Map<String, String> peerAddresses = new HashMap<>();
863 peerAddresses.put("follower-1",
864 followerActor.path().toString());
866 leaderActorContext.setPeerAddresses(peerAddresses);
867 leaderActorContext.setCommitIndex(1);
868 leaderActorContext.setLastApplied(1);
869 leaderActorContext.getTermInformation().update(1, "leader");
871 Leader leader = new Leader(leaderActorContext);
873 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
875 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
877 assertEquals(RaftState.Leader, raftActorBehavior.state());
879 assertEquals(2, leaderActorContext.getCommitIndex());
881 ApplyLogEntries applyLogEntries =
882 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
883 ApplyLogEntries.class);
885 assertNotNull(applyLogEntries);
887 assertEquals(2, leaderActorContext.getLastApplied());
889 assertEquals(2, applyLogEntries.getToIndex());
891 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
894 assertEquals(1,applyStateList.size());
896 ApplyState applyState = (ApplyState) applyStateList.get(0);
898 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
904 public void testHandleAppendEntriesReplyUnknownFollower(){
905 new JavaTestKit(getSystem()) {
908 ActorRef leaderActor =
909 getSystem().actorOf(Props.create(MessageCollectorActor.class));
911 MockRaftActorContext leaderActorContext =
912 new MockRaftActorContext("leader", getSystem(), leaderActor);
914 Leader leader = new Leader(leaderActorContext);
916 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
918 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
920 assertEquals(RaftState.Leader, raftActorBehavior.state());
926 public void testHandleRequestVoteReply(){
927 new JavaTestKit(getSystem()) {
930 ActorRef leaderActor =
931 getSystem().actorOf(Props.create(MessageCollectorActor.class));
933 MockRaftActorContext leaderActorContext =
934 new MockRaftActorContext("leader", getSystem(), leaderActor);
936 Leader leader = new Leader(leaderActorContext);
938 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
940 assertEquals(RaftState.Leader, raftActorBehavior.state());
942 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
944 assertEquals(RaftState.Leader, raftActorBehavior.state());
949 public void testIsolatedLeaderCheckNoFollowers() {
950 new JavaTestKit(getSystem()) {{
951 ActorRef leaderActor = getTestActor();
953 MockRaftActorContext leaderActorContext =
954 new MockRaftActorContext("leader", getSystem(), leaderActor);
956 Map<String, String> peerAddresses = new HashMap<>();
957 leaderActorContext.setPeerAddresses(peerAddresses);
959 Leader leader = new Leader(leaderActorContext);
960 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
961 Assert.assertTrue(behavior instanceof Leader);
966 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
967 new JavaTestKit(getSystem()) {{
969 ActorRef followerActor1 = getTestActor();
970 ActorRef followerActor2 = getTestActor();
972 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
974 Map<String, String> peerAddresses = new HashMap<>();
975 peerAddresses.put("follower-1", followerActor1.path().toString());
976 peerAddresses.put("follower-2", followerActor2.path().toString());
978 leaderActorContext.setPeerAddresses(peerAddresses);
980 Leader leader = new Leader(leaderActorContext);
981 leader.stopIsolatedLeaderCheckSchedule();
983 leader.markFollowerActive("follower-1");
984 leader.markFollowerActive("follower-2");
985 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
986 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
987 behavior instanceof Leader);
989 // kill 1 follower and verify if that got killed
990 final JavaTestKit probe = new JavaTestKit(getSystem());
991 probe.watch(followerActor1);
992 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
993 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
994 assertEquals(termMsg1.getActor(), followerActor1);
996 leader.markFollowerInActive("follower-1");
997 leader.markFollowerActive("follower-2");
998 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
999 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1000 behavior instanceof Leader);
1002 // kill 2nd follower and leader should change to Isolated leader
1003 followerActor2.tell(PoisonPill.getInstance(), null);
1004 probe.watch(followerActor2);
1005 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1006 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1007 assertEquals(termMsg2.getActor(), followerActor2);
1009 leader.markFollowerInActive("follower-2");
1010 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1011 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1012 behavior instanceof IsolatedLeader);
1017 class MockLeader extends Leader {
1019 FollowerToSnapshot fts;
1021 public MockLeader(RaftActorContext context){
1025 public FollowerToSnapshot getFollowerToSnapshot() {
1029 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1030 fts = new FollowerToSnapshot(bs);
1031 mapFollowerToSnapshot.put(followerId, fts);
1036 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1038 private long electionTimeOutIntervalMillis;
1039 private int snapshotChunkSize;
1041 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1043 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1044 this.snapshotChunkSize = snapshotChunkSize;
1048 public FiniteDuration getElectionTimeOutInterval() {
1049 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1053 public int getSnapshotChunkSize() {
1054 return snapshotChunkSize;