Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index c125bd32b60a5c5d714ea13e7008417f904d88dc..569f6b3d245fe5a6c1021f0537f08b3e1ec36f6e 100644 (file)
@@ -5,25 +5,29 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-
-import java.util.Set;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * The behavior of a RaftActor when it is in the CandidateState
- * <p/>
+ * The behavior of a RaftActor when it is in the Candidate raft state.
+ *
+ * <p>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -39,148 +43,181 @@ import java.util.Set;
  * <li> If election timeout elapses: start new election
  * </ul>
  */
-public class Candidate extends AbstractRaftActorBehavior {
+public final class Candidate extends AbstractRaftActorBehavior {
 
     private int voteCount;
 
     private final int votesRequired;
 
-    private final Set<String> peers;
-
-    public Candidate(RaftActorContext context) {
-        super(context);
-
-        peers = context.getPeerAddresses().keySet();
-
-        context.getLogger().debug("Election:Candidate has following peers:"+ peers);
-
-        if(peers.size() > 0) {
-            // Votes are required from a majority of the peers including self.
-            // The votesRequired field therefore stores a calculated value
-            // of the number of votes required for this candidate to win an
-            // election based on it's known peers.
-            // If a peer was added during normal operation and raft replicas
-            // came to know about them then the new peer would also need to be
-            // taken into consideration when calculating this value.
-            // Here are some examples for what the votesRequired would be for n
-            // peers
-            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
-            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
-            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peers.size();
-            int self = 1;
-            votesRequired = (noOfPeers + self) / 2 + 1;
-        } else {
-            votesRequired = 0;
+    private final Collection<String> votingPeers = new ArrayList<>();
+
+    public Candidate(final RaftActorContext context) {
+        super(context, RaftState.Candidate);
+
+        for (PeerInfo peer: context.getPeers()) {
+            if (peer.isVoting()) {
+                votingPeers.add(peer.getId());
+            }
         }
 
+        log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
+
+        votesRequired = getMajorityVoteCount(votingPeers.size());
+
         startNewTerm();
-        scheduleElection(electionDuration());
-    }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+        if (votingPeers.isEmpty()) {
+            actor().tell(ElectionTimeout.INSTANCE, actor());
+        } else {
+            scheduleElection(electionDuration());
+        }
+    }
 
-        context.getLogger().info("Candidate: Received {}", appendEntries.toString());
+    @Override
+    public String getLeaderId() {
+        return null;
+    }
 
-        return state();
+    @Override
+    public short getLeaderPayloadVersion() {
+        return -1;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
+    @Override
+    protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+
+        log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+
+        // Some other candidate for the same term became a leader and sent us an append entry
+        if (currentTerm() == appendEntries.getTerm()) {
+            log.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
+                    logName(), appendEntries.getLeaderId(), currentTerm());
+
+            return switchBehavior(new Follower(context));
+        }
+
+        return this;
+    }
 
-        return state();
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+            final AppendEntriesReply appendEntriesReply) {
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply) {
+    @Override
+    protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
+        log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
             voteCount++;
         }
 
         if (voteCount >= votesRequired) {
-            return RaftState.Leader;
+            if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+                log.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
+                        logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
+                return internalSwitchBehavior(RaftState.PreLeader);
+            } else {
+                return internalSwitchBehavior(RaftState.Leader);
+            }
         }
 
-        return state();
+        return this;
+    }
+
+    @Override
+    protected FiniteDuration electionDuration() {
+        return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
     }
 
-    @Override public RaftState state() {
-        return RaftState.Candidate;
+
+    @Override
+    ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        throw new IllegalStateException("A candidate should never attempt to apply " + entry);
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
+        if (message instanceof ElectionTimeout) {
+            log.debug("{}: Received ElectionTimeout", logName());
 
-        Object message = fromSerializableMessage(originalMessage);
+            if (votesRequired == 0) {
+                // If there are no peers then we should be a Leader
+                // We wait for the election timeout to occur before declare
+                // ourselves the leader. This gives enough time for a leader
+                // who we do not know about (as a peer)
+                // to send a message to the candidate
+
+                return internalSwitchBehavior(RaftState.Leader);
+            }
+
+            startNewTerm();
+            scheduleElection(electionDuration());
+            return this;
+        }
 
         if (message instanceof RaftRPC) {
 
             RaftRPC rpc = (RaftRPC) message;
 
-            context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+            log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+                        context.getTermInformation().getCurrentTerm());
 
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                log.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
+                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
-            }
-        }
 
-        if (message instanceof ElectionTimeout) {
-            if (votesRequired == 0) {
-                // If there are no peers then we should be a Leader
-                // We wait for the election timeout to occur before declare
-                // ourselves the leader. This gives enough time for a leader
-                // who we do not know about (as a peer)
-                // to send a message to the candidate
-                return RaftState.Leader;
+                // The raft paper does not say whether or not a Candidate can/should process a RequestVote in
+                // this case but doing so gains quicker convergence when the sender's log is more up-to-date.
+                if (message instanceof RequestVote) {
+                    super.handleMessage(sender, message);
+                }
+
+                return internalSwitchBehavior(RaftState.Follower);
             }
-            startNewTerm();
-            scheduleElection(electionDuration());
-            return state();
         }
 
         return super.handleMessage(sender, message);
     }
 
-
     private void startNewTerm() {
-
-
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().updateAndPersist(currentTerm + 1,
-            context.getId());
+        long newTerm = currentTerm + 1;
+        context.getTermInformation().updateAndPersist(newTerm, context.getId());
 
-        context.getLogger().debug("Starting new term " + (currentTerm + 1));
+        log.info("{}: Starting new election term {}", logName(), newTerm);
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
-        for (String peerId : peers) {
+        for (String peerId : votingPeers) {
             ActorSelection peerActor = context.getPeerActorSelection(peerId);
-            if(peerActor != null) {
-                peerActor.tell(new RequestVote(
+            if (peerActor != null) {
+                RequestVote requestVote = new RequestVote(
                         context.getTermInformation().getCurrentTerm(),
                         context.getId(),
                         context.getReplicatedLog().lastIndex(),
-                        context.getReplicatedLog().lastTerm()),
-                    context.getActor()
-                );
-            }
-        }
+                        context.getReplicatedLog().lastTerm());
 
+                log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
 
+                peerActor.tell(requestVote, context.getActor());
+            }
+        }
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() {
         stopElection();
     }
 }