Bug 2187: AddServer unit test and bug fixes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
index 0c34158ca33c05dd751a66cb8eda8b4f252596cb..ae23140114ed7d2ca8e73584ecd997fefd437723 100644 (file)
@@ -7,26 +7,27 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
-import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -59,40 +60,71 @@ class RaftActorServerConfigurationSupport {
             // snapshot installation is successful
             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
             return true;
-        } else if(message instanceof ApplyState){
-            ApplyState applyState = (ApplyState) message;
-            Payload data = applyState.getReplicatedLogEntry().getData();
-            if( data instanceof ServerConfigurationPayload){
-                 LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
-                                                                                    (ServerConfigurationPayload)data);
-                 // respond ok to follower
-                 respondToClient(raftActor, ServerChangeStatus.OK);
-                 return true;
-            }
-            return false;
+        } else if(message instanceof ApplyState) {
+            return onApplyState((ApplyState) message, raftActor);
         } else {
             return false;
         }
     }
 
+    private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+        Payload data = applyState.getReplicatedLogEntry().getData();
+        if(data instanceof ServerConfigurationPayload) {
+            CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+            if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
+                LOG.info("{} has been successfully replicated to a majority of followers", data);
+
+                // respond ok to follower
+                respondToClient(raftActor, ServerChangeStatus.OK);
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
-        LOG.debug("onAddServer: {}", addServer);
+        LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
             return;
         }
 
         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
-        boolean process = !followerInfoQueue.isEmpty();
+        boolean process = followerInfoQueue.isEmpty();
         followerInfoQueue.add(followerInfo);
         if(process) {
             processAddServer(raftActor);
         }
     }
 
+    /**
+     * The algorithm for AddServer is as follows:
+     * <ul>
+     * <li>Add the new server as a peer.</li>
+     * <li>Add the new follower to the leader.</li>
+     * <li>If new server should be voting member</li>
+     * <ul>
+     *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
+     *     <li>Initiate install snapshot to the new follower.</li>
+     *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
+     * </ul>
+     * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
+     * <li>On replication consensus, respond to caller with OK.</li>
+     * </ul>
+     * If the install snapshot times out after a period of 2 * election time out
+     * <ul>
+     *     <li>Remove the new server as a peer.</li>
+     *     <li>Remove the new follower from the leader.</li>
+     *     <li>Respond to caller with TIMEOUT.</li>
+     * </ul>
+     */
     private void processAddServer(RaftActor raftActor){
-        LOG.debug("In processAddServer");
+        LOG.debug("{}: In processAddServer", context.getId());
+
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        AddServer addSrv = followerInfoQueue.peek().getAddServer();
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        AddServer addSrv = followerInfo.getAddServer();
         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
 
         // if voting member - initialize to VOTING_NOT_INITIALIZED
@@ -100,24 +132,6 @@ class RaftActorServerConfigurationSupport {
             FollowerState.NON_VOTING;
         leader.addFollower(addSrv.getNewServerId(), initialState);
 
-        // TODO
-        // if initialState == FollowerState.VOTING_NOT_INITIALIZED
-        //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
-        //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
-        //     Set local instance state and wait for message from the AbstractLeader when install snapshot
-        //     is done and return now
-        //     When install snapshot message is received, go to step 1
-        // else
-        //     go to step 2
-        //
-        // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
-        //        minIsolatedLeaderPeerCount
-        // 2) persist and replicate ServerConfigurationPayload via
-        //           raftActor.persistData(sender, uuid, newServerConfigurationPayload)
-        // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
-        //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
-        //       this class.
-        //
         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
@@ -129,8 +143,7 @@ class RaftActorServerConfigurationSupport {
                 context.getActorSystem().dispatcher(), context.getActor());
         } else {
             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
-            persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
-                                                                                 addSrv.getNewServerId());
+            persistNewServerConfiguration(raftActor, followerInfo);
         }
     }
 
@@ -153,6 +166,11 @@ class RaftActorServerConfigurationSupport {
 
     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
                                                  RaftActor raftActor, ActorRef sender){
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        // Sanity check - it's possible we get a reply after it timed out.
+        if(followerInfo == null) {
+            return;
+        }
 
         String followerId = reply.getFollowerId();
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
@@ -160,27 +178,19 @@ class RaftActorServerConfigurationSupport {
         stopFollowerTimer();
         followerLogInformation.setFollowerState(FollowerState.VOTING);
         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
-        persistNewServerConfiguration(raftActor, sender, followerId);
+
+        persistNewServerConfiguration(raftActor, followerInfo);
     }
 
-    private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
-        /* get old server configuration list */
-        Map<String, String> tempMap =  context.getPeerAddresses();
-        List<String> cOld = new ArrayList<String>();
-        for (Map.Entry<String, String> entry : tempMap.entrySet()) {
-            if(!entry.getKey().equals(followerId)){
-                cOld.add(entry.getKey());
-            }
-        }
-        LOG.debug("Cold server configuration : {}",  cOld.toString());
-        /* get new server configuration list */
-        List <String> cNew = new ArrayList<String>(cOld);
-        cNew.add(followerId);
-        LOG.debug("Cnew server configuration : {}",  cNew.toString());
-        // construct the peer list
-        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
-        /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
-        raftActor.persistData(sender, context.getId(), servPayload);
+    private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
+        List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
+        cNew.add(context.getId());
+
+        LOG.debug("New server configuration : {}",  cNew.toString());
+
+        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+
+        raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
    }
 
    private void stopFollowerTimer() {
@@ -190,22 +200,19 @@ class RaftActorServerConfigurationSupport {
    }
 
    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-
         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
         // cleanup
         context.removePeer(serverId);
         leader.removeFollower(serverId);
-        LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+        LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
    }
 
    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-
-        int size = followerInfoQueue.size();
-
         // remove the entry from the queue
         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+
         // get the sender
         ActorRef toClient = fInfo.getClientRequestor();
 
@@ -216,19 +223,27 @@ class RaftActorServerConfigurationSupport {
         }
    }
 
-    // mantain sender actorRef
-    private class CatchupFollowerInfo {
+    // maintain sender actorRef
+    private static class CatchupFollowerInfo {
         private final AddServer addServer;
         private final ActorRef clientRequestor;
+        private final String contextId;
 
         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
             addServer = addSrv;
             clientRequestor = cliReq;
+            contextId = UUID.randomUUID().toString();
         }
-        public AddServer getAddServer(){
+
+        String getContextId() {
+            return contextId;
+        }
+
+        AddServer getAddServer(){
             return addServer;
         }
-        public ActorRef getClientRequestor(){
+
+        ActorRef getClientRequestor(){
             return clientRequestor;
         }
     }