+ return createActorContext(leaderActor);
+ }
+
+ protected RaftActorContext createActorContext(ActorRef actorRef) {
+ return new MockRaftActorContext("test", getSystem(), actorRef);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+ public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+ private static AbstractRaftActorBehavior behavior;
+
+ public ForwardMessageToBehaviorActor(){
+
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ super.onReceive(message);
+ behavior.handleMessage(sender(), message);
+ }
+
+ public static void setBehavior(AbstractRaftActorBehavior behavior){
+ ForwardMessageToBehaviorActor.behavior = behavior;
+ }
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntriesMessages.AppendEntries appendEntries =
+ (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(
+ Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntriesMessages.AppendEntries appendEntries =
+ (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailure(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyLogEntries applyLogEntries =
+ (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
+
+ assertNotNull(applyLogEntries);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyLogEntries.getToIndex());
+
+ List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ }};
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+
+ }};
+
+ }
+
+ class MockLeader extends Leader {
+
+ FollowerToSnapshot fts;
+
+ public MockLeader(RaftActorContext context){
+ super(context);
+ }
+
+ public FollowerToSnapshot getFollowerToSnapshot() {
+ return fts;
+ }
+
+ public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+ fts = new FollowerToSnapshot(bs);
+ mapFollowerToSnapshot.put(followerId, fts);
+
+ }
+ }
+
+ private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+ private long electionTimeOutIntervalMillis;
+ private int snapshotChunkSize;
+
+ public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ super();
+ this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int getSnapshotChunkSize() {
+ return snapshotChunkSize;
+ }