Add replication capability to Shard
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
index ecd49012461a7b1ee76f4678c7451a4fa1edbfb1..c125bd32b60a5c5d714ea13e7008417f904d88dc 100644 (file)
@@ -12,16 +12,14 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
@@ -43,24 +41,20 @@ import java.util.Map;
  */
 public class Candidate extends AbstractRaftActorBehavior {
 
-    private final Map<String, ActorSelection> peerToActor = new HashMap<>();
-
     private int voteCount;
 
     private final int votesRequired;
 
+    private final Set<String> peers;
+
     public Candidate(RaftActorContext context) {
         super(context);
 
-        Collection<String> peerPaths = context.getPeerAddresses().values();
+        peers = context.getPeerAddresses().keySet();
 
-        for (String peerPath : peerPaths) {
-            peerToActor.put(peerPath,
-                context.actorSelection(peerPath));
-        }
+        context.getLogger().debug("Election:Candidate has following peers:"+ peers);
 
-        context.getLogger().debug("Election:Candidate has following peers:"+peerToActor.keySet());
-        if(peerPaths.size() > 0) {
+        if(peers.size() > 0) {
             // Votes are required from a majority of the peers including self.
             // The votesRequired field therefore stores a calculated value
             // of the number of votes required for this candidate to win an
@@ -73,7 +67,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
             // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
             // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peerPaths.size();
+            int noOfPeers = peers.size();
             int self = 1;
             votesRequired = (noOfPeers + self) / 2 + 1;
         } else {
@@ -87,6 +81,8 @@ public class Candidate extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        context.getLogger().info("Candidate: Received {}", appendEntries.toString());
+
         return state();
     }
 
@@ -115,10 +111,16 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
 
         if (message instanceof RaftRPC) {
+
             RaftRPC rpc = (RaftRPC) message;
+
+            context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
@@ -141,6 +143,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             scheduleElection(electionDuration());
             return state();
         }
+
         return super.handleMessage(sender, message);
     }
 
@@ -153,21 +156,25 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId());
+        context.getTermInformation().updateAndPersist(currentTerm + 1,
+            context.getId());
 
-        context.getLogger().debug("Starting new term " + (currentTerm+1));
+        context.getLogger().debug("Starting new term " + (currentTerm + 1));
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
-        for (ActorSelection peerActor : peerToActor.values()) {
-            peerActor.tell(new RequestVote(
-                    context.getTermInformation().getCurrentTerm(),
-                    context.getId(),
-                    context.getReplicatedLog().lastIndex(),
-                    context.getReplicatedLog().lastTerm()),
-                context.getActor()
-            );
+        for (String peerId : peers) {
+            ActorSelection peerActor = context.getPeerActorSelection(peerId);
+            if(peerActor != null) {
+                peerActor.tell(new RequestVote(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm()),
+                    context.getActor()
+                );
+            }
         }