Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index d93271072ce60f86747a5ef6ff19f825ce1473c5..74069a18e1f879e71040a171eef8497889fd0f8c 100644 (file)
@@ -11,21 +11,24 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
  * The behavior of a RaftActor in the Follower state
- *
+ * <p/>
  * <ul>
  * <li> Respond to RPCs from candidates and leaders
  * <li> If election timeout elapses without receiving AppendEntries
  * RPC from current leader or granting vote to candidate:
  * convert to candidate
  * </ul>
- *
  */
 public class Follower extends AbstractRaftActorBehavior {
     public Follower(RaftActorContext context) {
@@ -35,31 +38,180 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-        return suggestedState;
+        AppendEntries appendEntries) {
+
+        // If we got here then we do appear to be talking to the leader
+        leaderId = appendEntries.getLeaderId();
+
+        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+        // whose term matches prevLogTerm (§5.3)
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+            .get(appendEntries.getPrevLogIndex());
+
+
+        boolean noMatchingTerms = true;
+
+        if (lastIndex() == -1
+            && appendEntries.getPrevLogIndex() != -1) {
+
+            context.getLogger().debug(
+                "The followers log is empty and the senders prevLogIndex is {}",
+                appendEntries.getPrevLogIndex());
+
+        } else if (lastIndex() > -1
+            && appendEntries.getPrevLogIndex() != -1
+            && previousEntry == null) {
+
+            context.getLogger().debug(
+                "The log is not empty but the prevLogIndex {} was not found in it",
+                appendEntries.getPrevLogIndex());
+
+        } else if (lastIndex() > -1
+            && previousEntry != null
+            && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+
+            context.getLogger().debug(
+                "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                , previousEntry.getTerm()
+                , appendEntries.getPrevLogTerm());
+        } else {
+            noMatchingTerms = false;
+        }
+
+        if (noMatchingTerms) {
+            sender.tell(
+                new AppendEntriesReply(context.getId(), currentTerm(), false,
+                    lastIndex(), lastTerm()), actor()
+            );
+            return state();
+        }
+
+        if (appendEntries.getEntries() != null
+            && appendEntries.getEntries().size() > 0) {
+            context.getLogger().debug(
+                "Number of entries to be appended = " + appendEntries
+                    .getEntries().size()
+            );
+
+            // 3. If an existing entry conflicts with a new one (same index
+            // but different terms), delete the existing entry and all that
+            // follow it (§5.3)
+            int addEntriesFrom = 0;
+            if (context.getReplicatedLog().size() > 0) {
+                for (int i = 0;
+                     i < appendEntries.getEntries()
+                         .size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry =
+                        appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
+                        .get(matchEntry.getIndex());
+
+                    if (newEntry == null) {
+                        //newEntry not found in the log
+                        break;
+                    }
+
+                    if (newEntry != null && newEntry.getTerm() == matchEntry
+                        .getTerm()) {
+                        continue;
+                    }
+                    if (newEntry != null && newEntry.getTerm() != matchEntry
+                        .getTerm()) {
+                        context.getLogger().debug(
+                            "Removing entries from log starting at "
+                                + matchEntry.getIndex()
+                        );
+                        context.getReplicatedLog()
+                            .removeFromAndPersist(matchEntry.getIndex());
+                        break;
+                    }
+                }
+            }
+
+            context.getLogger().debug(
+                "After cleanup entries to be added from = " + (addEntriesFrom
+                    + lastIndex())
+            );
+
+            // 4. Append any new entries not already in the log
+            for (int i = addEntriesFrom;
+                 i < appendEntries.getEntries().size(); i++) {
+                context.getLogger().debug(
+                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+                        .toString()
+                );
+                context.getReplicatedLog()
+                    .appendAndPersist(appendEntries.getEntries().get(i));
+            }
+
+            context.getLogger().debug(
+                "Log size is now " + context.getReplicatedLog().size());
+        }
+
+
+        // 5. If leaderCommit > commitIndex, set commitIndex =
+        // min(leaderCommit, index of last new entry)
+
+        long prevCommitIndex = context.getCommitIndex();
+
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+            context.getReplicatedLog().lastIndex()));
+
+        if (prevCommitIndex != context.getCommitIndex()) {
+            context.getLogger()
+                .debug("Commit index set to " + context.getCommitIndex());
+        }
+
+        // If commitIndex > lastApplied: increment lastApplied, apply
+        // log[lastApplied] to state machine (§5.3)
+        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+            applyLogToStateMachine(appendEntries.getLeaderCommit());
+        }
+
+        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
+            lastIndex(), lastTerm()), actor());
+
+        return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
+        AppendEntriesReply appendEntriesReply) {
+        return state();
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        RequestVoteReply requestVoteReply) {
+        return state();
     }
 
-    @Override protected RaftState state() {
+    @Override public RaftState state() {
         return RaftState.Follower;
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        if(message instanceof ElectionTimeout){
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+            }
+        }
+
+        if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
+        } else if (message instanceof InstallSnapshot) {
+            InstallSnapshot snapshot = (InstallSnapshot) message;
+            actor().tell(new ApplySnapshot(snapshot), actor());
         }
 
         scheduleElection(electionDuration());
 
         return super.handleMessage(sender, message);
     }
+
+    @Override public void close() throws Exception {
+        stopElection();
+    }
 }