Implement remove-all-shard-replicas RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index 0c0f42a3fe7aa109aae7c7e49bce5f8cc600d6fd..068d326eff583320caf73fdecf922ab9c3a501d7 100644 (file)
@@ -45,9 +45,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -90,12 +93,12 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
-            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
-            return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
         LOG.info("Adding replica for shard {}", shardName);
@@ -123,17 +126,17 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
-            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
-            return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
         final String memberName = input.getMemberName();
         if(Strings.isNullOrEmpty(memberName)) {
-            return newFailedRpcResultBuilder("A valid member name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid member name must be specified");
         }
 
         LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
@@ -183,10 +186,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
 
 
     @Override
-    public Future<RpcResult<Void>> removeAllShardReplicas() {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(RemoveAllShardReplicasInput input) {
+        LOG.info("Removing replicas for all shards");
+
+        final String memberName = input.getMemberName();
+        if(Strings.isNullOrEmpty(memberName)) {
+            return newFailedRpcResultFuture("A valid member name must be specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = new Function<String, Object>() {
+            @Override
+            public Object apply(String shardName) {
+                return new RemoveShardReplica(shardName, memberName);
+            }
+        };
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+            @Override
+            public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
+                return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+            }
+        }, "Failed to add replica");
     }
 
     @Override
@@ -209,7 +233,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.debug("backupDatastore: {}", input);
 
         if(Strings.isNullOrEmpty(input.getFilePath())) {
-            return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
@@ -323,7 +347,8 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
             Throwable failure) {
         LOG.error(msg, failure);
-        returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
+        returnFuture.set(ClusterAdminRpcService.<Void>newFailedRpcResultBuilder(String.format("%s: %s", msg,
+                failure.getMessage())).build());
     }
 
     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
@@ -345,12 +370,16 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
-    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
+    private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(String message) {
+        return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
+    }
+
+    private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message) {
         return newFailedRpcResultBuilder(message, null);
     }
 
-    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
+    private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message, Throwable cause) {
+        return RpcResultBuilder.<T>failed().withError(ErrorType.RPC, message, cause);
     }
 
     private static RpcResult<Void> newSuccessfulResult() {