BUG 2187 Handle RemoveShardReplica in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 85f40e3a07038520d18c7fd50bcfa6713c8002a1..3e14c5837dfadb2f4878167a74cbc64765d07899 100644 (file)
@@ -16,6 +16,7 @@ import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
@@ -90,6 +91,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -232,8 +235,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof PrimaryShardFoundForContext) {
             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
             onPrimaryShardFoundContext(primaryShardFoundContext);
-        } else if(message instanceof RemoveShardReplica){
-            onRemoveShardReplica((RemoveShardReplica)message);
+        } else if(message instanceof RemoveShardReplica) {
+            onRemoveShardReplica((RemoveShardReplica) message);
+        } else if(message instanceof WrappedShardResponse){
+            onWrappedShardResponse((WrappedShardResponse) message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
@@ -248,12 +253,67 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onWrappedShardResponse(WrappedShardResponse message) {
+        if (message.getResponse() instanceof RemoveServerReply) {
+            onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
+        }
+    }
+
+    private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
+        shardReplicaOperationsInProgress.remove(shardName);
+        originalSender.tell(new Status.Success(null), self());
+    }
+
     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
+                    getSender());
+        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
+            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
+                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
         }
     }
 
+    private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+            final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+        final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+        //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+        LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+                primaryPath, shardId);
+
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+                duration());
+        Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+                new RemoveServer(shardId.toString()), removeServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+                            primaryPath, shardName);
+
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+
+                    // FAILURE
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    // SUCCESS
+                    self().tell(new WrappedShardResponse(shardName, response), sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private void onShardReplicaRemoved(ServerRemoved message) {
         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
@@ -952,7 +1012,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
-
             }
 
             @Override
@@ -1063,40 +1122,50 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            Exception failure;
-            switch (replyMsg.getStatus()) {
-                case TIMEOUT:
-                    failure = new TimeoutException(String.format(
-                            "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
-                            "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                            leaderPath, shardName));
-                    break;
-                case NO_LEADER:
-                    failure = createNoShardLeaderException(shardInfo.getShardId());
-                    break;
-                default :
-                    failure = new RuntimeException(String.format(
-                            "AddServer request to leader %s for shard %s failed with status %s",
-                            leaderPath, shardName, replyMsg.getStatus()));
-            }
+            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
 
             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
-    private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
-        String shardName = shardReplicaMsg.getShardName();
-
-        // verify the local shard replica is available in the controller node
-        if (!localShards.containsKey(shardName)) {
-            String msg = String.format("Local shard %s does not", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
-            return;
+    private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+                                               String leaderPath, ShardIdentifier shardId) {
+        Exception failure;
+        switch (serverChangeStatus) {
+            case TIMEOUT:
+                failure = new TimeoutException(String.format(
+                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+                        "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                        leaderPath, shardId.getShardName()));
+                break;
+            case NO_LEADER:
+                failure = createNoShardLeaderException(shardId);
+                break;
+            case NOT_SUPPORTED:
+                failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+                        serverChange.getSimpleName(), shardId.getShardName()));
+                break;
+            default :
+                failure = new RuntimeException(String.format(
+                        "%s request to leader %s for shard %s failed with status %s",
+                        serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
-        // call RemoveShard for the shardName
-        getSender().tell(new akka.actor.Status.Success(true), getSelf());
-        return;
+        return failure;
+    }
+
+    private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+        findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
+                shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            }
+        });
     }
 
     private void persistShardList() {
@@ -1489,6 +1558,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private CountDownLatch waitTillReadyCountdownLatch;
         private PrimaryShardInfoFutureCache primaryShardInfoCache;
         private DatastoreSnapshot restoreFromSnapshot;
+
         private volatile boolean sealed;
 
         @SuppressWarnings("unchecked")
@@ -1642,10 +1712,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
-        public ActorRef getShardManagerActor() {
-            return shardManagerActor;
-        }
-
         @Override
         public void onFailure(Throwable failure) {
             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
@@ -1675,45 +1741,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final RemotePrimaryShardFound remotePrimaryShardFound;
         private final LocalPrimaryShardFound localPrimaryShardFound;
 
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
+                @Nonnull Object primaryFoundMessage) {
             this.shardName = Preconditions.checkNotNull(shardName);
             this.contextMessage = Preconditions.checkNotNull(contextMessage);
             Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
+                    (RemotePrimaryShardFound) primaryFoundMessage : null;
+            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
+                    (LocalPrimaryShardFound) primaryFoundMessage : null;
         }
 
         @Nonnull
-        public String getPrimaryPath(){
-            if(remotePrimaryShardFound != null){
+        String getPrimaryPath(){
+            if(remotePrimaryShardFound != null) {
                 return remotePrimaryShardFound.getPrimaryPath();
             }
             return localPrimaryShardFound.getPrimaryPath();
         }
 
         @Nonnull
-        public Object getContextMessage() {
+        Object getContextMessage() {
             return contextMessage;
         }
 
         @Nullable
-        public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+        RemotePrimaryShardFound getRemotePrimaryShardFound() {
             return remotePrimaryShardFound;
         }
 
-        @Nullable
-        public LocalPrimaryShardFound getLocalPrimaryShardFound(){
-            return localPrimaryShardFound;
+        @Nonnull
+        String getShardName() {
+            return shardName;
         }
+    }
+
+    /**
+     * The WrappedShardResponse class wraps a response from a Shard.
+     */
+    private static class WrappedShardResponse {
+        private final String shardName;
+        private final Object response;
 
-        boolean isPrimaryLocal(){
-            return (remotePrimaryShardFound == null);
+        private WrappedShardResponse(String shardName, Object response) {
+            this.shardName = shardName;
+            this.response = response;
         }
 
-        @Nonnull
-        public String getShardName() {
+        String getShardName() {
             return shardName;
         }
+
+        Object getResponse() {
+            return response;
+        }
     }
 }