+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
+ }
+
+ // TODO : Refactor this method into a bunch of smaller methods
+ // to make it easier to read. Before refactoring ensure tests
+ // cover the code properly
+
+ // 1. Reply false if term < currentTerm (§5.1)
+ // This is handled in the appendEntries method of the base class
+
+ // If we got here then we do appear to be talking to the leader
+ leaderId = appendEntries.getLeaderId();
+
+ // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+ // whose term matches prevLogTerm (§5.3)
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(appendEntries.getPrevLogIndex());
+
+
+ boolean outOfSync = true;
+
+ // First check if the logs are in sync or not
+ if (lastIndex() == -1
+ && appendEntries.getPrevLogIndex() != -1) {
+
+ // The follower's log is out of sync because the leader does have
+ // an entry at prevLogIndex and this follower has no entries in
+ // it's log.
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
+ appendEntries.getPrevLogIndex());
+ }
+
+ } else if (lastIndex() > -1
+ && appendEntries.getPrevLogIndex() != -1
+ && previousEntry == null) {
+
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry was not found in it's log
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
+ appendEntries.getPrevLogIndex());
+ }
+
+ } else if (lastIndex() > -1
+ && previousEntry != null
+ && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry does exist in the follower's log but it has
+ // a different term in it
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , previousEntry.getTerm()
+ , appendEntries.getPrevLogTerm());
+ }
+ } else {
+ outOfSync = false;
+ }
+
+ if (outOfSync) {
+ // We found that the log was out of sync so just send a negative
+ // reply and return
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm()
+ );
+ }
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
+ return this;
+ }
+
+ if (appendEntries.getEntries() != null
+ && appendEntries.getEntries().size() > 0) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Number of entries to be appended = {}", appendEntries.getEntries().size()
+ );
+ }
+
+ // 3. If an existing entry conflicts with a new one (same index
+ // but different terms), delete the existing entry and all that
+ // follow it (§5.3)
+ int addEntriesFrom = 0;
+ if (context.getReplicatedLog().size() > 0) {
+
+ // Find the entry up until which the one that is not in the follower's log
+ for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
+
+ if (newEntry == null) {
+ //newEntry not found in the log
+ break;
+ }
+
+ if (newEntry.getTerm() == matchEntry
+ .getTerm()) {
+ continue;
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing entries from log starting at {}", matchEntry.getIndex()
+ );
+ }
+
+ // Entries do not match so remove all subsequent entries
+ context.getReplicatedLog()
+ .removeFromAndPersist(matchEntry.getIndex());
+ break;
+ }
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
+ );
+ }
+
+ // 4. Append any new entries not already in the log
+ for (int i = addEntriesFrom;
+ i < appendEntries.getEntries().size(); i++) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ }
+ }
+
+
+ // 5. If leaderCommit > commitIndex, set commitIndex =
+ // min(leaderCommit, index of last new entry)
+
+ long prevCommitIndex = context.getCommitIndex();
+
+ context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+ context.getReplicatedLog().lastIndex()));
+
+ if (prevCommitIndex != context.getCommitIndex()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Commit index set to {}", context.getCommitIndex());
+ }
+ }
+
+ // If commitIndex > lastApplied: increment lastApplied, apply
+ // log[lastApplied] to state machine (§5.3)
+ // check if there are any entries to be applied. last-applied can be equal to last-index
+ if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+ context.getLastApplied() < lastIndex()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+ );
+ }
+
+ applyLogToStateMachine(appendEntries.getLeaderCommit());
+ }
+
+ sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm()), actor());
+
+ return this;