Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index 90120e9ac36dae88b4b8a9cb39a1f251ba0b7070..afa46892bea33754340f7ef6891f8cdbe646ed61 100644 (file)
@@ -21,10 +21,12 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * The behavior of a RaftActor when it is in the CandidateState
- * <p/>
+ * The behavior of a RaftActor when it is in the Candidate raft state.
+ *
+ * <p>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -51,21 +53,19 @@ public class Candidate extends AbstractRaftActorBehavior {
     public Candidate(RaftActorContext context) {
         super(context, RaftState.Candidate);
 
-        for(PeerInfo peer: context.getPeers()) {
-            if(peer.isVoting()) {
+        for (PeerInfo peer: context.getPeers()) {
+            if (peer.isVoting()) {
                 votingPeers.add(peer.getId());
             }
         }
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
-        }
+        log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
 
         votesRequired = getMajorityVoteCount(votingPeers.size());
 
         startNewTerm();
 
-        if(votingPeers.isEmpty()){
+        if (votingPeers.isEmpty()) {
             actor().tell(ElectionTimeout.INSTANCE, actor());
         } else {
             scheduleElection(electionDuration());
@@ -83,17 +83,14 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
-        }
+        log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
         // Some other candidate for the same term became a leader and sent us an append entry
-        if(currentTerm() == appendEntries.getTerm()){
-            LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
-                    logName(), currentTerm());
+        if (currentTerm() == appendEntries.getTerm()) {
+            log.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
+                    logName(), appendEntries.getLeaderId(), currentTerm());
 
             return switchBehavior(new Follower(context));
         }
@@ -108,16 +105,16 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     @Override
     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
-        LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
+        log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
             voteCount++;
         }
 
         if (voteCount >= votesRequired) {
-            if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) {
-                LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(),
-                        context.getReplicatedLog().lastIndex());
+            if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+                log.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
+                        logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
                 return internalSwitchBehavior(RaftState.PreLeader);
             } else {
                 return internalSwitchBehavior(RaftState.Leader);
@@ -128,9 +125,14 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
-        if (originalMessage instanceof ElectionTimeout) {
-            LOG.debug("{}: Received ElectionTimeout", logName());
+    protected FiniteDuration electionDuration() {
+        return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        if (message instanceof ElectionTimeout) {
+            log.debug("{}: Received ElectionTimeout", logName());
 
             if (votesRequired == 0) {
                 // If there are no peers then we should be a Leader
@@ -147,20 +149,20 @@ public class Candidate extends AbstractRaftActorBehavior {
             return this;
         }
 
-        final Object message = fromSerializableMessage(originalMessage);
         if (message instanceof RaftRPC) {
 
             RaftRPC rpc = (RaftRPC) message;
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+            log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
                         context.getTermInformation().getCurrentTerm());
-            }
 
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                log.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
+                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 // The raft paper does not say whether or not a Candidate can/should process a RequestVote in
@@ -188,21 +190,21 @@ public class Candidate extends AbstractRaftActorBehavior {
         long newTerm = currentTerm + 1;
         context.getTermInformation().updateAndPersist(newTerm, context.getId());
 
-        LOG.debug("{}: Starting new term {}", logName(), newTerm);
+        log.info("{}: Starting new election term {}", logName(), newTerm);
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
         for (String peerId : votingPeers) {
             ActorSelection peerActor = context.getPeerActorSelection(peerId);
-            if(peerActor != null) {
+            if (peerActor != null) {
                 RequestVote requestVote = new RequestVote(
                         context.getTermInformation().getCurrentTerm(),
                         context.getId(),
                         context.getReplicatedLog().lastIndex(),
                         context.getReplicatedLog().lastTerm());
 
-                LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+                log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
 
                 peerActor.tell(requestVote, context.getActor());
             }