+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyLogEntries applyLogEntries =
+ MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
+
+ assertNotNull(applyLogEntries);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyLogEntries.getToIndex());
+
+ List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }};
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef leaderActor = getTestActor();
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }};
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor1 = getTestActor();
+ ActorRef followerActor2 = getTestActor();
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.stopIsolatedLeaderCheckSchedule();
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+
+ }};
+ }
+
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+ System.out.println("appendEntriesReply: "+appendEntriesReply);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ // Clear initial heartbeat messages
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ // Should send first log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
+
+ followerActor.underlyingActor().clear();
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ }};