Properly handle RequestVote in all states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
index d7a8d5abb35bdb258633b65a783aed9e0aa9677d..ae1baec471ef1d08abdcbb6d9c56052b95a87ffa 100644 (file)
@@ -9,13 +9,19 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract class that represents the behavior of a RaftActor
@@ -34,6 +40,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
+    /**
+     * The maximum election time variance
+     */
+    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+    /**
+     * The interval in which a new election would get triggered if no leader is found
+     */
+    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+
+    /**
+     *
+     */
+
+    private Cancellable electionCancel = null;
+
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
@@ -76,23 +98,45 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
-    /**
-     * Derived classes should not directly handle RequestVote messages it
-     * should let the base class handle it first. Once the base class handles
-     * the RequestVote message and does the common actions that are applicable
-     * in all RaftState's it will delegate the handling of the RequestVote
-     * message to the derived class to do more state specific handling by calling
-     * this method
-     *
-     * @param sender         The actor that sent this message
-     * @param requestVote    The RequestVote message
-     * @param suggestedState The state that the RaftActor should be in based
-     *                       on the base class's processing of the RequestVote
-     *                       message
-     * @return
-     */
-    protected abstract RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState);
+    protected RaftState handleRequestVote(ActorRef sender,
+        RequestVote requestVote, RaftState suggestedState){
+
+        boolean grantVote = false;
+
+        //  Reply false if term < currentTerm (§5.1)
+        if(requestVote.getTerm() < currentTerm()){
+            grantVote = false;
+
+        // If votedFor is null or candidateId, and candidate’s log is at
+        // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+        } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
+
+            boolean candidateLatest = false;
+
+            // From §5.4.1
+            // Raft determines which of two logs is more up-to-date
+            // by comparing the index and term of the last entries in the
+            // logs. If the logs have last entries with different terms, then
+            // the log with the later term is more up-to-date. If the logs
+            // end with the same term, then whichever log is longer is
+            // more up-to-date.
+            if(requestVote.getLastLogTerm() > lastTerm()){
+                candidateLatest = true;
+            } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
+                candidateLatest = true;
+            }
+
+            if(candidateLatest) {
+                grantVote = true;
+                context.getTermInformation().update(requestVote.getTerm(),
+                    requestVote.getCandidateId());
+            }
+        }
+
+        sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+
+        return suggestedState;
+    }
 
     /**
      * Derived classes should not directly handle RequestVoteReply messages it
@@ -119,6 +163,46 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected abstract RaftState state();
 
+    protected FiniteDuration electionDuration(){
+        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+    }
+
+    protected void scheduleElection(FiniteDuration interval) {
+
+        if (electionCancel != null && !electionCancel.isCancelled()) {
+            electionCancel.cancel();
+        }
+
+        // Schedule an election. When the scheduler triggers an ElectionTimeout
+        // message is sent to itself
+        electionCancel =
+            context.getActorSystem().scheduler().scheduleOnce(interval,
+                context.getActor(), new ElectionTimeout(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    protected long currentTerm(){
+        return context.getTermInformation().getCurrentTerm();
+    }
+
+    protected String votedFor(){
+        return context.getTermInformation().getVotedFor();
+    }
+
+    protected ActorRef actor(){
+        return context.getActor();
+    }
+
+    protected long lastTerm() {
+        return context.getReplicatedLog().last().getTerm();
+    }
+
+    protected long lastIndex() {
+        return context.getReplicatedLog().last().getIndex();
+    }
+
+
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
         RaftState raftState = state();
@@ -127,8 +211,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
         if (message instanceof AppendEntries) {
             AppendEntries appendEntries = (AppendEntries) message;
-            if (appendEntries.getLeaderCommit() > context.getLastApplied()
-                .get()) {
+            if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
                 applyLogToStateMachine(appendEntries.getLeaderCommit());
             }
             raftState = handleAppendEntries(sender, appendEntries, raftState);
@@ -148,8 +231,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     private RaftState applyTerm(RaftRPC rpc) {
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()
-            .get()) {
+        // If RPC request or response contains term T > currentTerm:
+        // set currentTerm = T, convert to follower (§5.1)
+        // This applies to all RPC messages and responses
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
             context.getTermInformation().update(rpc.getTerm(), null);
             return RaftState.Follower;
         }
@@ -157,6 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     private void applyLogToStateMachine(long index) {
-        context.getLastApplied().set(index);
+        // Send a local message to the local RaftActor (it's derived class to be
+        // specific to apply the log to it's index)
+        context.setLastApplied(index);
     }
+
+
 }