Change InstallSnapshot and reply to use Externalizable Proxy
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index 774691154a8ee4782e02532916ebb9c7f29734a5..52ed26758ee26b6f9949fa1202f1a44628b82364 100644 (file)
@@ -9,18 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.List;
-
 /**
  * The behavior of a RaftActor when it is in the CandidateState
- * <p>
+ * <p/>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -37,39 +41,175 @@ import java.util.List;
  * </ul>
  */
 public class Candidate extends AbstractRaftActorBehavior {
-    private final List<String> peers;
 
-    public Candidate(RaftActorContext context, List<String> peers) {
-        super(context);
-        this.peers = peers;
+    private int voteCount;
+
+    private final int votesRequired;
+
+    private final Collection<String> votingPeers = new ArrayList<>();
+
+    public Candidate(RaftActorContext context) {
+        super(context, RaftState.Candidate);
+
+        for(PeerInfo peer: context.getPeers()) {
+            if(peer.isVoting()) {
+                votingPeers.add(peer.getId());
+            }
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
+        }
+
+        votesRequired = getMajorityVoteCount(votingPeers.size());
+
+        startNewTerm();
+
+        if(votingPeers.isEmpty()){
+            actor().tell(ElectionTimeout.INSTANCE, actor());
+        } else {
+            scheduleElection(electionDuration());
+        }
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-        return suggestedState;
+    @Override
+    public final String getLeaderId() {
+        return null;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
+    @Override
+    public final short getLeaderPayloadVersion() {
+        return -1;
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-        return suggestedState;
+    @Override
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+        AppendEntries appendEntries) {
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+        }
+
+        // Some other candidate for the same term became a leader and sent us an append entry
+        if(currentTerm() == appendEntries.getTerm()){
+            LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
+                    logName(), currentTerm());
+
+            return switchBehavior(new Follower(context));
+        }
+
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+        return this;
     }
 
-    @Override protected RaftState state() {
-        return RaftState.Candidate;
+    @Override
+    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+        LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
+
+        if (requestVoteReply.isVoteGranted()) {
+            voteCount++;
+        }
+
+        if (voteCount >= votesRequired) {
+            if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) {
+                LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(),
+                        context.getReplicatedLog().lastIndex());
+                return internalSwitchBehavior(RaftState.PreLeader);
+            } else {
+                return internalSwitchBehavior(RaftState.Leader);
+            }
+        }
+
+        return this;
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        if (message instanceof ElectionTimeout) {
+            LOG.debug("{}: Received ElectionTimeout", logName());
+
+            if (votesRequired == 0) {
+                // If there are no peers then we should be a Leader
+                // We wait for the election timeout to occur before declare
+                // ourselves the leader. This gives enough time for a leader
+                // who we do not know about (as a peer)
+                // to send a message to the candidate
+
+                return internalSwitchBehavior(RaftState.Leader);
+            }
+
+            startNewTerm();
+            scheduleElection(electionDuration());
+            return this;
+        }
+
+        if (message instanceof RaftRPC) {
+
+            RaftRPC rpc = (RaftRPC) message;
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+                        context.getTermInformation().getCurrentTerm());
+            }
+
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+                // The raft paper does not say whether or not a Candidate can/should process a RequestVote in
+                // this case but doing so gains quicker convergence when the sender's log is more up-to-date.
+                if (message instanceof RequestVote) {
+                    super.handleMessage(sender, message);
+                }
+
+                return internalSwitchBehavior(RaftState.Follower);
+            }
+        }
+
         return super.handleMessage(sender, message);
     }
+
+
+    private void startNewTerm() {
+
+
+        // set voteCount back to 1 (that is voting for self)
+        voteCount = 1;
+
+        // Increment the election term and vote for self
+        long currentTerm = context.getTermInformation().getCurrentTerm();
+        long newTerm = currentTerm + 1;
+        context.getTermInformation().updateAndPersist(newTerm, context.getId());
+
+        LOG.debug("{}: Starting new term {}", logName(), newTerm);
+
+        // Request for a vote
+        // TODO: Retry request for vote if replies do not arrive in a reasonable
+        // amount of time TBD
+        for (String peerId : votingPeers) {
+            ActorSelection peerActor = context.getPeerActorSelection(peerId);
+            if(peerActor != null) {
+                RequestVote requestVote = new RequestVote(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm());
+
+                LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+
+                peerActor.tell(requestVote, context.getActor());
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        stopElection();
+    }
 }