- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
- if(! appendEntriesReply.isSuccess()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
- }
- }
-
- // Update the FollowerLogInformation
- String followerId = appendEntriesReply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
- return this;
- }
-
- if (appendEntriesReply.isSuccess()) {
- followerLogInformation
- .setMatchIndex(appendEntriesReply.getLogLastIndex());
- followerLogInformation
- .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
- } else {
-
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
-
- followerLogInformation.decrNextIndex();
- }
-
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
- for (long N = context.getCommitIndex() + 1; ; N++) {
- int replicatedCount = 1;
-
- for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
- replicatedCount++;
- }
- }
-
- if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry =
- context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null
- && replicatedLogEntry.getTerm()
- == currentTerm()) {
- context.setCommitIndex(N);
- }
- } else {
- break;
- }
- }
-
- // Apply the change to the state machine
- if (context.getCommitIndex() > context.getLastApplied()) {
- applyLogToStateMachine(context.getCommitIndex());
- }
-
- return this;
- }
-
- protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
- }
-
- return toRemove;
- }
-
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
-
- return null;
- }
-
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
- return this;
- }
-
- @Override public RaftState state() {
- return RaftState.Leader;
- }
-
- @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {