BUG-2187: Add Server - Leader Implementation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
index 70ef3691003038b870cd8b9467a5f44fdd3ce382..0c34158ca33c05dd751a66cb8eda8b4f252596cb 100644 (file)
@@ -7,15 +7,27 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Handles server configuration related messages for a RaftActor.
@@ -24,8 +36,11 @@ import org.slf4j.LoggerFactory;
  */
 class RaftActorServerConfigurationSupport {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
-
     private final RaftActorContext context;
+    // client follower queue
+    private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
+    // timeout handle
+    private Cancellable followerTimeout = null;
 
     RaftActorServerConfigurationSupport(RaftActorContext context) {
         this.context = context;
@@ -35,6 +50,26 @@ class RaftActorServerConfigurationSupport {
         if(message instanceof AddServer) {
             onAddServer((AddServer)message, raftActor, sender);
             return true;
+        } else if (message instanceof FollowerCatchUpTimeout){
+            FollowerCatchUpTimeout followerTimeout  = (FollowerCatchUpTimeout)message;
+            // abort follower catchup on timeout
+            onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
+            return true;
+        } else if (message instanceof UnInitializedFollowerSnapshotReply){
+            // snapshot installation is successful
+            onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
+            return true;
+        } else if(message instanceof ApplyState){
+            ApplyState applyState = (ApplyState) message;
+            Payload data = applyState.getReplicatedLogEntry().getData();
+            if( data instanceof ServerConfigurationPayload){
+                 LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
+                                                                                    (ServerConfigurationPayload)data);
+                 // respond ok to follower
+                 respondToClient(raftActor, ServerChangeStatus.OK);
+                 return true;
+            }
+            return false;
         } else {
             return false;
         }
@@ -42,26 +77,35 @@ class RaftActorServerConfigurationSupport {
 
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
         LOG.debug("onAddServer: {}", addServer);
-
         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
             return;
         }
 
-        // TODO - check if a server config is in progress. If so, cache this AddServer request to be processed
-        // after the current one is done.
-
-        context.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+        CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
+        boolean process = !followerInfoQueue.isEmpty();
+        followerInfoQueue.add(followerInfo);
+        if(process) {
+            processAddServer(raftActor);
+        }
+    }
 
+    private void processAddServer(RaftActor raftActor){
+        LOG.debug("In processAddServer");
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
+        AddServer addSrv = followerInfoQueue.peek().getAddServer();
+        context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
+
+        // if voting member - initialize to VOTING_NOT_INITIALIZED
+        FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
             FollowerState.NON_VOTING;
-        leader.addFollower(addServer.getNewServerId(), initialState);
+        leader.addFollower(addSrv.getNewServerId(), initialState);
 
         // TODO
         // if initialState == FollowerState.VOTING_NOT_INITIALIZED
         //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
         //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
-        //     Set local instance state and wait for message from the AbstractLeader when install snapshot is done and return now
+        //     Set local instance state and wait for message from the AbstractLeader when install snapshot
+        //     is done and return now
         //     When install snapshot message is received, go to step 1
         // else
         //     go to step 2
@@ -74,9 +118,20 @@ class RaftActorServerConfigurationSupport {
         //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
         //       this class.
         //
-
-        // TODO - temporary
-        sender.tell(new AddServerReply(ServerChangeStatus.OK, raftActor.getLeaderId()), raftActor.self());
+        if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
+            LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
+            leader.initiateCaptureSnapshot(addSrv.getNewServerId());
+            // schedule the catchup timeout timer
+            followerTimeout = context.getActorSystem().scheduler()
+               .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
+                TimeUnit.MILLISECONDS),
+                context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
+                context.getActorSystem().dispatcher(), context.getActor());
+        } else {
+            LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
+            persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
+                                                                                 addSrv.getNewServerId());
+        }
     }
 
     private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
@@ -95,4 +150,86 @@ class RaftActorServerConfigurationSupport {
 
         return true;
     }
+
+    private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
+                                                 RaftActor raftActor, ActorRef sender){
+
+        String followerId = reply.getFollowerId();
+        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+        FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
+        stopFollowerTimer();
+        followerLogInformation.setFollowerState(FollowerState.VOTING);
+        leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+        persistNewServerConfiguration(raftActor, sender, followerId);
+    }
+
+    private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
+        /* get old server configuration list */
+        Map<String, String> tempMap =  context.getPeerAddresses();
+        List<String> cOld = new ArrayList<String>();
+        for (Map.Entry<String, String> entry : tempMap.entrySet()) {
+            if(!entry.getKey().equals(followerId)){
+                cOld.add(entry.getKey());
+            }
+        }
+        LOG.debug("Cold server configuration : {}",  cOld.toString());
+        /* get new server configuration list */
+        List <String> cNew = new ArrayList<String>(cOld);
+        cNew.add(followerId);
+        LOG.debug("Cnew server configuration : {}",  cNew.toString());
+        // construct the peer list
+        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
+        /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
+        raftActor.persistData(sender, context.getId(), servPayload);
+   }
+
+   private void stopFollowerTimer() {
+        if (followerTimeout != null && !followerTimeout.isCancelled()) {
+            followerTimeout.cancel();
+        }
+   }
+
+   private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
+
+        LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
+        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+        // cleanup
+        context.removePeer(serverId);
+        leader.removeFollower(serverId);
+        LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+        respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
+   }
+
+   private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
+
+        int size = followerInfoQueue.size();
+
+        // remove the entry from the queue
+        CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+        // get the sender
+        ActorRef toClient = fInfo.getClientRequestor();
+
+        toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
+        LOG.debug("Response returned is {} for server {} ",  result, fInfo.getAddServer().getNewServerId());
+        if(!followerInfoQueue.isEmpty()){
+            processAddServer(raftActor);
+        }
+   }
+
+    // mantain sender actorRef
+    private class CatchupFollowerInfo {
+        private final AddServer addServer;
+        private final ActorRef clientRequestor;
+
+        CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
+            addServer = addSrv;
+            clientRequestor = cliReq;
+        }
+        public AddServer getAddServer(){
+            return addServer;
+        }
+        public ActorRef getClientRequestor(){
+            return clientRequestor;
+        }
+    }
 }