Implement handling of AppendEntries from a recipient perspective
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
index ae1baec471ef1d08abdcbb6d9c56052b95a87ffa..167082711d82b46c61e90e8311473d051e06461c 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -45,10 +46,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    /**
+     * The interval at which a heart beat message will be sent to the remote
+     * RaftActor
+     * <p/>
+     * Since this is set to 100 milliseconds the Election timeout should be
+     * at least 200 milliseconds
+     */
+    protected static final FiniteDuration HEART_BEAT_INTERVAL =
+        new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
     /**
      * The interval in which a new election would get triggered if no leader is found
      */
-    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+    private static final long ELECTION_TIME_INTERVAL =
+        HEART_BEAT_INTERVAL.toMillis() * 2;
 
     /**
      *
@@ -79,6 +91,71 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState);
 
+
+    protected RaftState appendEntries(ActorRef sender,
+        AppendEntries appendEntries, RaftState raftState){
+
+        // 1. Reply false if term < currentTerm (§5.1)
+        if(appendEntries.getTerm() < currentTerm()){
+            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+            return state();
+        }
+
+        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+        // whose term matches prevLogTerm (§5.3)
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+            .get(appendEntries.getPrevLogIndex());
+
+        if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
+            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+            return state();
+        }
+
+        if(appendEntries.getEntries() != null) {
+            // 3. If an existing entry conflicts with a new one (same index
+            // but different terms), delete the existing entry and all that
+            // follow it (§5.3)
+            int addEntriesFrom = 0;
+            for (int i = 0;
+                 i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                ReplicatedLogEntry newEntry = context.getReplicatedLog()
+                    .get(i + 1);
+
+                if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
+                    break;
+                }
+                if (newEntry != null && newEntry.getTerm() != appendEntries
+                    .getEntries().get(i).getTerm()) {
+                    context.getReplicatedLog().removeFrom(i + 1);
+                    break;
+                }
+            }
+
+            // 4. Append any new entries not already in the log
+            for (int i = addEntriesFrom;
+                 i < appendEntries.getEntries().size(); i++) {
+                context.getReplicatedLog()
+                    .append(appendEntries.getEntries().get(i));
+            }
+        }
+
+
+        // 5. If leaderCommit > commitIndex, set commitIndex =
+        // min(leaderCommit, index of last new entry)
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+            context.getReplicatedLog().last().getIndex()));
+
+        // If commitIndex > lastApplied: increment lastApplied, apply
+        // log[lastApplied] to state machine (§5.3)
+        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+            applyLogToStateMachine(appendEntries.getLeaderCommit());
+        }
+
+        sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
+
+        return handleAppendEntries(sender, appendEntries, raftState);
+    }
+
     /**
      * Derived classes should not directly handle AppendEntriesReply messages it
      * should let the base class handle it first. Once the base class handles
@@ -98,18 +175,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
-    protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState){
+    protected RaftState requestVote(ActorRef sender,
+        RequestVote requestVote, RaftState suggestedState) {
 
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (§5.1)
-        if(requestVote.getTerm() < currentTerm()){
+        if (requestVote.getTerm() < currentTerm()) {
             grantVote = false;
 
-        // If votedFor is null or candidateId, and candidate’s log is at
-        // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
-        } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
+            // If votedFor is null or candidateId, and candidate’s log is at
+            // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+        } else if (votedFor() == null || votedFor()
+            .equals(requestVote.getCandidateId())) {
 
             boolean candidateLatest = false;
 
@@ -120,13 +198,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             // the log with the later term is more up-to-date. If the logs
             // end with the same term, then whichever log is longer is
             // more up-to-date.
-            if(requestVote.getLastLogTerm() > lastTerm()){
+            if (requestVote.getLastLogTerm() > lastTerm()) {
                 candidateLatest = true;
-            } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
+            } else if ((requestVote.getLastLogTerm() == lastTerm())
+                && requestVote.getLastLogIndex() >= lastTerm()) {
                 candidateLatest = true;
             }
 
-            if(candidateLatest) {
+            if (candidateLatest) {
                 grantVote = true;
                 context.getTermInformation().update(requestVote.getTerm(),
                     requestVote.getCandidateId());
@@ -163,9 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected abstract RaftState state();
 
-    protected FiniteDuration electionDuration(){
+    protected FiniteDuration electionDuration() {
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
+            TimeUnit.MILLISECONDS);
     }
 
     protected void scheduleElection(FiniteDuration interval) {
@@ -182,15 +262,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    protected long currentTerm(){
+    protected long currentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
-    protected String votedFor(){
+    protected String votedFor() {
         return context.getTermInformation().getVotedFor();
     }
 
-    protected ActorRef actor(){
+    protected ActorRef actor() {
         return context.getActor();
     }
 
@@ -210,18 +290,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             raftState = applyTerm((RaftRPC) message);
         }
         if (message instanceof AppendEntries) {
-            AppendEntries appendEntries = (AppendEntries) message;
-            if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
-                applyLogToStateMachine(appendEntries.getLeaderCommit());
-            }
-            raftState = handleAppendEntries(sender, appendEntries, raftState);
+            raftState = appendEntries(sender, (AppendEntries) message,
+                raftState);
         } else if (message instanceof AppendEntriesReply) {
             raftState =
                 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
                     raftState);
         } else if (message instanceof RequestVote) {
             raftState =
-                handleRequestVote(sender, (RequestVote) message, raftState);
+                requestVote(sender, (RequestVote) message, raftState);
         } else if (message instanceof RequestVoteReply) {
             raftState =
                 handleRequestVoteReply(sender, (RequestVoteReply) message,