+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.stopIsolatedLeaderCheckSchedule();
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+
+ }};
+ }
+
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive("follower-reply");
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ List<Object> entries = ForwardMessageToBehaviorActor
+ .getAllMatching(followerActor, AppendEntries.class);
+
+ assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+ AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+ assertEquals(1, appendEntriesSecond.getLeaderCommit());
+ assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+ assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+ }};