Implement remove-all-shard-replicas RPC 60/32360/1
authorTom Pantelis <tpanteli@brocade.com>
Mon, 11 Jan 2016 16:45:01 +0000 (11:45 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 11 Jan 2016 19:10:58 +0000 (14:10 -0500)
Change-Id: Idc1481c0f6903554fd6659c32c9639af5aa47e92
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index 0c0f42a3fe7aa109aae7c7e49bce5f8cc600d6fd..068d326eff583320caf73fdecf922ab9c3a501d7 100644 (file)
@@ -45,9 +45,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -90,12 +93,12 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
-            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
-            return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
         LOG.info("Adding replica for shard {}", shardName);
         }
 
         LOG.info("Adding replica for shard {}", shardName);
@@ -123,17 +126,17 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
         final String shardName = input.getShardName();
         if(Strings.isNullOrEmpty(shardName)) {
-            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
         if(dataStoreType == null) {
-            return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
         final String memberName = input.getMemberName();
         if(Strings.isNullOrEmpty(memberName)) {
         }
 
         final String memberName = input.getMemberName();
         if(Strings.isNullOrEmpty(memberName)) {
-            return newFailedRpcResultBuilder("A valid member name must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid member name must be specified");
         }
 
         LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
         }
 
         LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
@@ -183,10 +186,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
 
 
     @Override
 
 
     @Override
-    public Future<RpcResult<Void>> removeAllShardReplicas() {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(RemoveAllShardReplicasInput input) {
+        LOG.info("Removing replicas for all shards");
+
+        final String memberName = input.getMemberName();
+        if(Strings.isNullOrEmpty(memberName)) {
+            return newFailedRpcResultFuture("A valid member name must be specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = new Function<String, Object>() {
+            @Override
+            public Object apply(String shardName) {
+                return new RemoveShardReplica(shardName, memberName);
+            }
+        };
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+            @Override
+            public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
+                return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+            }
+        }, "Failed to add replica");
     }
 
     @Override
     }
 
     @Override
@@ -209,7 +233,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.debug("backupDatastore: {}", input);
 
         if(Strings.isNullOrEmpty(input.getFilePath())) {
         LOG.debug("backupDatastore: {}", input);
 
         if(Strings.isNullOrEmpty(input.getFilePath())) {
-            return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
+            return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
         }
 
         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
@@ -323,7 +347,8 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
             Throwable failure) {
         LOG.error(msg, failure);
     private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
             Throwable failure) {
         LOG.error(msg, failure);
-        returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
+        returnFuture.set(ClusterAdminRpcService.<Void>newFailedRpcResultBuilder(String.format("%s: %s", msg,
+                failure.getMessage())).build());
     }
 
     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
     }
 
     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
@@ -345,12 +370,16 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
         return returnFuture;
     }
 
-    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
+    private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(String message) {
+        return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
+    }
+
+    private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message) {
         return newFailedRpcResultBuilder(message, null);
     }
 
         return newFailedRpcResultBuilder(message, null);
     }
 
-    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
+    private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message, Throwable cause) {
+        return RpcResultBuilder.<T>failed().withError(ErrorType.RPC, message, cause);
     }
 
     private static RpcResult<Void> newSuccessfulResult() {
     }
 
     private static RpcResult<Void> newSuccessfulResult() {
index b88cbbba234ffbac4056c7eb058e06173d457c71..2d81db7117421ded2c4b5d662a74570c33063773 100644 (file)
@@ -39,14 +39,25 @@ module cluster-admin {
         }
     }
 
         }
     }
 
+    grouping shard-result-output {
+        list shard-result {
+            key "shard-name data-store-type";
+            uses shard-operation-result;
+
+            description "The list of results, one per shard";
+        }
+    }
+    
     rpc add-shard-replica {
         input {
             leaf shard-name {
     rpc add-shard-replica {
         input {
             leaf shard-name {
+                mandatory true;
                 type string;
                 description "The name of the shard for which to create a replica.";
             }
 
             leaf data-store-type {
                 type string;
                 description "The name of the shard for which to create a replica.";
             }
 
             leaf data-store-type {
+                mandatory true;
                 type data-store-type;
                 description "The type of the data store to which the replica belongs";
             }
                 type data-store-type;
                 description "The type of the data store to which the replica belongs";
             }
@@ -62,16 +73,19 @@ module cluster-admin {
     rpc remove-shard-replica {
         input {
             leaf shard-name {
     rpc remove-shard-replica {
         input {
             leaf shard-name {
+                mandatory true;
                 type string;
                 description "The name of the shard for which to remove the replica.";
             }
 
             leaf member-name {
                 type string;
                 description "The name of the shard for which to remove the replica.";
             }
 
             leaf member-name {
+                mandatory true;
                 type string;
                 description "The cluster member from which the shard replica should be removed";
             }
 
             leaf data-store-type {
                 type string;
                 description "The cluster member from which the shard replica should be removed";
             }
 
             leaf data-store-type {
+                mandatory true;
                 type data-store-type;
                 description "The type of the data store to which the replica belongs";
             }
                 type data-store-type;
                 description "The type of the data store to which the replica belongs";
             }
@@ -83,12 +97,7 @@ module cluster-admin {
 
     rpc add-replicas-for-all-shards {
         output {
 
     rpc add-replicas-for-all-shards {
         output {
-            list shard-result {
-                key "shard-name data-store-type";
-                uses shard-operation-result;
-
-                description "The list of results, one per shard";
-            }
+            uses shard-result-output;
         }
 
         description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing
         }
 
         description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing
@@ -96,6 +105,18 @@ module cluster-admin {
     }
 
     rpc remove-all-shard-replicas {
     }
 
     rpc remove-all-shard-replicas {
+        input {
+            leaf member-name {
+                mandatory true;
+                type string;
+                description "The cluster member from which the shard replicas should be removed";
+            }
+        }
+        
+        output {
+            uses shard-result-output;
+        }
+        
         description "Removes replicas for all shards on this node. This is equivalent to issuing
             a remove-shard-replica for all shards and essentially removes this node from a cluster.";
     }
         description "Removes replicas for all shards on this node. This is equivalent to issuing
             a remove-shard-replica for all shards and essentially removes this node from a cluster.";
     }
index 6e16dcfb097ddeffcf2fd0e740a00fd4e1546b7c..4b7619d11ac8df3a6aa18f4d35bb39e1527f4e26 100644 (file)
@@ -58,9 +58,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -457,8 +459,65 @@ public class ClusterAdminRpcServiceTest {
     }
 
     @Test
     }
 
     @Test
-    public void testRemoveAllShardReplicas() {
-        // TODO implement
+    public void testRemoveAllShardReplicas() throws Exception {
+        String name = "testRemoveAllShardReplicas";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
+                "pets", null, Arrays.asList("member-1", "member-2", "member-3"));
+        leaderNode1.configDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
+        leaderNode1.kit().expectMsgClass(Success.class);
+
+        replicaNode2.configDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
+        replicaNode2.kit().expectMsgClass(Success.class);
+
+        replicaNode3.configDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
+        replicaNode3.kit().expectMsgClass(Success.class);
+
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
+                new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
+        RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("pets", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
+        verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
+        verifyNoShardPresent(replicaNode3.configDataStore(), "people");
+        verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+
+        service3.close();
     }
 
     @Test
     }
 
     @Test