Merge "Avoid IllegalArgument on missing source"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index f61905e393ff9e391c31ecccae9ac0bdfd1612ea..74bede171f1f6e6ad6b33feef6806d3c77321581 100644 (file)
@@ -10,26 +10,19 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
+import java.util.Set;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
- * <p>
+ * <p/>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -47,162 +40,145 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Candidate extends AbstractRaftActorBehavior {
 
-    /**
-     * The maximum election time variance
-     */
-    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+    private int voteCount;
 
-    /**
-     * The interval in which a new election would get triggered if no leader is found
-     */
-    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+    private final int votesRequired;
 
-    /**
-     *
-     */
-    private final Map<String, ActorSelection> peerToActor = new HashMap<>();
+    private final Set<String> peers;
 
-    private Cancellable electionCancel = null;
+    public Candidate(RaftActorContext context) {
+        super(context, RaftState.Candidate);
 
-    private int voteCount;
+        peers = context.getPeerAddresses().keySet();
 
-    private final int votesRequired;
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
+        }
 
-    public Candidate(RaftActorContext context, List<String> peerPaths) {
-        super(context);
+        votesRequired = getMajorityVoteCount(peers.size());
 
-        for (String peerPath : peerPaths) {
-            peerToActor.put(peerPath,
-                context.actorSelection(peerPath));
-        }
+        startNewTerm();
 
-        if(peerPaths.size() > 0) {
-            // Votes are required from a majority of the peers including self.
-            // The votesRequired field therefore stores a calculated value
-            // of the number of votes required for this candidate to win an
-            // election based on it's known peers.
-            // If a peer was added during normal operation and raft replicas
-            // came to know about them then the new peer would also need to be
-            // taken into consideration when calculating this value.
-            // Here are some examples for what the votesRequired would be for n
-            // peers
-            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
-            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
-            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peerPaths.size();
-            int self = 1;
-            votesRequired = (noOfPeers + self) / 2 + 1;
+        if(context.getPeerAddresses().isEmpty()){
+            actor().tell(ELECTION_TIMEOUT, actor());
         } else {
-            votesRequired = 0;
+            scheduleElection(electionDuration());
         }
 
-        scheduleElection(randomizedDuration());
-    }
-
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-
-        // There is some peer who thinks it's a leader but is not
-        // I will not accept this append entries
-        sender.tell(new AppendEntriesReply(
-            context.getTermInformation().getCurrentTerm().get(), false),
-            context.getActor());
 
-        return suggestedState;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+        AppendEntries appendEntries) {
 
-        // Some peer thinks I was a leader and sent me a reply
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+        }
 
-        return suggestedState;
+        return this;
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-
-        // We got this RequestVote because the term in there is less than
-        // or equal to our current term, so do not grant the vote
-        sender.tell(new RequestVoteReply(
-            context.getTermInformation().getCurrentTerm().get(), false),
-            context.getActor());
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
 
-        return suggestedState;
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        if(suggestedState == RaftState.Follower) {
-            // If base class thinks I should be follower then I am
-            return suggestedState;
-        }
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+            RequestVoteReply requestVoteReply) {
 
-        if(requestVoteReply.isVoteGranted()){
+        LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
+                voteCount);
+
+        if (requestVoteReply.isVoteGranted()) {
             voteCount++;
         }
 
-        if(voteCount >= votesRequired){
-            return RaftState.Leader;
+        if (voteCount >= votesRequired) {
+            return switchBehavior(new Leader(context));
         }
 
-        return state();
-    }
-
-    @Override protected RaftState state() {
-        return RaftState.Candidate;
+        return this;
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
-        if(message instanceof ElectionTimeout){
-            if(votesRequired == 0){
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
+
+        if (message instanceof RaftRPC) {
+
+            RaftRPC rpc = (RaftRPC) message;
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+                        context.getTermInformation().getCurrentTerm());
+            }
+
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+                return switchBehavior(new Follower(context));
+            }
+        }
+
+        if (message instanceof ElectionTimeout) {
+            LOG.debug("{}: Received ElectionTimeout", logName());
+
+            if (votesRequired == 0) {
                 // If there are no peers then we should be a Leader
                 // We wait for the election timeout to occur before declare
                 // ourselves the leader. This gives enough time for a leader
                 // who we do not know about (as a peer)
                 // to send a message to the candidate
-                return RaftState.Leader;
+
+                return switchBehavior(new Leader(context));
             }
-            scheduleElection(randomizedDuration());
-            return state();
+            startNewTerm();
+            scheduleElection(electionDuration());
+            return this;
         }
+
         return super.handleMessage(sender, message);
     }
 
-    private FiniteDuration randomizedDuration(){
-        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
-    }
 
-    private void scheduleElection(FiniteDuration interval) {
+    private void startNewTerm() {
+
 
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
         // Increment the election term and vote for self
-        AtomicLong currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId());
+        long currentTerm = context.getTermInformation().getCurrentTerm();
+        long newTerm = currentTerm + 1;
+        context.getTermInformation().updateAndPersist(newTerm, context.getId());
 
-        // Request for a vote
-        for(ActorSelection peerActor : peerToActor.values()){
-            peerActor.tell(new RequestVote(
-                context.getTermInformation().getCurrentTerm().get(),
-                context.getId(), context.getReplicatedLog().last().getIndex(),
-                context.getReplicatedLog().last().getTerm()),
-                context.getActor());
-        }
+        LOG.debug("{}: Starting new term {}", logName(), newTerm);
 
-        if (electionCancel != null && !electionCancel.isCancelled()) {
-            electionCancel.cancel();
+        // Request for a vote
+        // TODO: Retry request for vote if replies do not arrive in a reasonable
+        // amount of time TBD
+        for (String peerId : peers) {
+            ActorSelection peerActor = context.getPeerActorSelection(peerId);
+            if(peerActor != null) {
+                RequestVote requestVote = new RequestVote(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm());
+
+                LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+
+                peerActor.tell(requestVote, context.getActor());
+            }
         }
-
-        // Schedule an election. When the scheduler triggers an ElectionTimeout
-        // message is sent to itself
-        electionCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
-                context.getActor(), new ElectionTimeout(),
-                context.getActorSystem().dispatcher(), context.getActor());
     }
 
+    @Override public void close() throws Exception {
+        stopElection();
+    }
 }