+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ actorContext.setConfigParams(configParams);
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(1, objectList.size());
+
+ Object o = objectList.get(0);
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(2, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(3, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ // Count should still stay at 3
+ assertEquals(3, objectList.size());
+ }};
+ }
+
+
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testFollowerToSnapshotLogic() {
+
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ ByteString bs = toByteString(leadersSnapshot);
+ byte[] barray = bs.toByteArray();
+
+ leader.createFollowerToSnapshot("followerId", bs);
+ assertEquals(bs.size(), barray.length);
+
+ int chunkIndex=0;
+ for (int i=0; i < barray.length; i = i + 50) {
+ int j = i + 50;
+ chunkIndex++;
+
+ if (i + 50 > barray.length) {
+ j = barray.length;
+ }
+
+ ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+ leader.getFollowerToSnapshot().markSendStatus(true);
+ if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+ }
+ }
+
+ assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+ }
+
+
+ @Override protected RaftActorBehavior createBehavior(
+ RaftActorContext actorContext) {
+ return new Leader(actorContext);
+ }
+
+ @Override protected RaftActorContext createActorContext() {
+ return createActorContext(leaderActor);
+ }
+
+ @Override
+ protected RaftActorContext createActorContext(ActorRef actorRef) {
+ return new MockRaftActorContext("test", getSystem(), actorRef);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+ public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+ private static AbstractRaftActorBehavior behavior;
+
+ public ForwardMessageToBehaviorActor(){
+
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ super.onReceive(message);
+ behavior.handleMessage(sender(), message);
+ }
+
+ public static void setBehavior(AbstractRaftActorBehavior behavior){
+ ForwardMessageToBehaviorActor.behavior = behavior;
+ }
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(
+ Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+
+ Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailure(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyLogEntries applyLogEntries =
+ (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
+
+ assertNotNull(applyLogEntries);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyLogEntries.getToIndex());
+
+ List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }};
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef leaderActor = getTestActor();
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }};
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor1 = getTestActor();
+ ActorRef followerActor2 = getTestActor();
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.stopIsolatedLeaderCheckSchedule();
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+