+ @Override
+ protected MockRaftActorContext createActorContext() {
+ return createActorContext(leaderActor);
+ }
+
+ @Override
+ protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ return createActorContext("leader", actorRef);
+ }
+
+ private MockRaftActorContext createActorContextWithFollower() {
+ MockRaftActorContext actorContext = createActorContext();
+ actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+ followerActor.path().toString()).build());
+ return actorContext;
+ }
+
+ private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ return context;
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+ public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+ AbstractRaftActorBehavior behavior;
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(behavior != null) {
+ behavior.handleMessage(sender(), message);
+ }
+
+ super.onReceive(message);
+ }
+
+ public static Props props() {
+ return Props.create(ForwardMessageToBehaviorActor.class);
+ }
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ follower.close();
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().behavior = leader;
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(2, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ assertEquals(2, followerActorContext.getCommitIndex());
+
+ follower.close();
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailure(){
+ logStart("testHandleAppendEntriesReplyFailure");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ leader = new Leader(leaderActorContext);
+
+ // Send initial heartbeat reply with last index.
+ leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ logStart("testHandleAppendEntriesReplySuccess");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyLogEntries.class);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyLogEntries.getToIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ logStart("testHandleAppendEntriesReplyUnknownFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ logStart("testHandleRequestVoteReply");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ // Should be a no-op.
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+ new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ logStart("testIsolatedLeaderCheckNoFollowers");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowers");
+
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor1 = getTestActor();
+ ActorRef followerActor2 = getTestActor();
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leader = new Leader(leaderActorContext);
+ leader.stopIsolatedLeaderCheckSchedule();
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+ }};
+ }
+
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ logStart("testAppendEntryCallAtEndofAppendEntryReply");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ // Clear initial heartbeat messages
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Should send first log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
+
+ followerActor.underlyingActor().clear();
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+
+ follower.close();
+ }
+
+ private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+ private final long electionTimeOutIntervalMillis;
+ private final int snapshotChunkSize;
+
+ public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ super();
+ this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int getSnapshotChunkSize() {
+ return snapshotChunkSize;
+ }