Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index fb480a9433954a4afa238bbc6b9c0931488d2b7c..4a3e2c5d664406844edaddee6308abf112b0f79c 100644 (file)
@@ -9,14 +9,21 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.List;
+import java.util.Set;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
- * <p>
+ * <p/>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -33,15 +40,157 @@ import java.util.List;
  * </ul>
  */
 public class Candidate extends AbstractRaftActorBehavior {
-    private final List<String> peers;
 
-    public Candidate(RaftActorContext context, List<String> peers) {
+    private int voteCount;
+
+    private final int votesRequired;
+
+    private final Set<String> peers;
+
+    public Candidate(RaftActorContext context) {
         super(context);
-        this.peers = peers;
+
+        peers = context.getPeerAddresses().keySet();
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Candidate has following peers: {}", peers);
+        }
+
+        if(peers.size() > 0) {
+            // Votes are required from a majority of the peers including self.
+            // The votesRequired field therefore stores a calculated value
+            // of the number of votes required for this candidate to win an
+            // election based on it's known peers.
+            // If a peer was added during normal operation and raft replicas
+            // came to know about them then the new peer would also need to be
+            // taken into consideration when calculating this value.
+            // Here are some examples for what the votesRequired would be for n
+            // peers
+            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
+            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
+            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
+            int noOfPeers = peers.size();
+            int self = 1;
+            votesRequired = (noOfPeers + self) / 2 + 1;
+        } else {
+            votesRequired = 0;
+        }
+
+        startNewTerm();
+        scheduleElection(electionDuration());
     }
 
-    @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+        AppendEntries appendEntries) {
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
+
+        return this;
+    }
+
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
+
+        return this;
+    }
+
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+        RequestVoteReply requestVoteReply) {
+
+        if (requestVoteReply.isVoteGranted()) {
+            voteCount++;
+        }
+
+        if (voteCount >= votesRequired) {
+            return switchBehavior(new Leader(context));
+        }
+
+        return this;
+    }
+
+    @Override public RaftState state() {
         return RaftState.Candidate;
     }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
+
+        if (message instanceof RaftRPC) {
+
+            RaftRPC rpc = (RaftRPC) message;
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+            }
+
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+                return switchBehavior(new Follower(context));
+            }
+        }
+
+        if (message instanceof ElectionTimeout) {
+            if (votesRequired == 0) {
+                // If there are no peers then we should be a Leader
+                // We wait for the election timeout to occur before declare
+                // ourselves the leader. This gives enough time for a leader
+                // who we do not know about (as a peer)
+                // to send a message to the candidate
+
+                return switchBehavior(new Leader(context));
+            }
+            startNewTerm();
+            scheduleElection(electionDuration());
+            return this;
+        }
+
+        return super.handleMessage(sender, message);
+    }
+
+
+    private void startNewTerm() {
+
+
+        // set voteCount back to 1 (that is voting for self)
+        voteCount = 1;
+
+        // Increment the election term and vote for self
+        long currentTerm = context.getTermInformation().getCurrentTerm();
+        context.getTermInformation().updateAndPersist(currentTerm + 1,
+            context.getId());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Starting new term {}", (currentTerm + 1));
+        }
+
+        // Request for a vote
+        // TODO: Retry request for vote if replies do not arrive in a reasonable
+        // amount of time TBD
+        for (String peerId : peers) {
+            ActorSelection peerActor = context.getPeerActorSelection(peerId);
+            if(peerActor != null) {
+                peerActor.tell(new RequestVote(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm()),
+                    context.getActor()
+                );
+            }
+        }
+
+
+    }
+
+    @Override public void close() throws Exception {
+        stopElection();
+    }
 }