From 07d596866600a0daa7f8ee018d50ed55b88fe683 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 11 Jan 2016 11:45:01 -0500 Subject: [PATCH] Implement remove-all-shard-replicas RPC Change-Id: Idc1481c0f6903554fd6659c32c9639af5aa47e92 Signed-off-by: Tom Pantelis --- .../admin/ClusterAdminRpcService.java | 61 ++++++++++++----- .../src/main/yang/cluster-admin.yang | 33 +++++++-- .../admin/ClusterAdminRpcServiceTest.java | 67 +++++++++++++++++-- 3 files changed, 135 insertions(+), 26 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 0c0f42a3fe..068d326eff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -45,9 +45,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -90,12 +93,12 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl public Future> addShardReplica(final AddShardReplicaInput input) { final String shardName = input.getShardName(); if(Strings.isNullOrEmpty(shardName)) { - return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid shard name must be specified"); } DataStoreType dataStoreType = input.getDataStoreType(); if(dataStoreType == null) { - return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } LOG.info("Adding replica for shard {}", shardName); @@ -123,17 +126,17 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl public Future> removeShardReplica(RemoveShardReplicaInput input) { final String shardName = input.getShardName(); if(Strings.isNullOrEmpty(shardName)) { - return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid shard name must be specified"); } DataStoreType dataStoreType = input.getDataStoreType(); if(dataStoreType == null) { - return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } final String memberName = input.getMemberName(); if(Strings.isNullOrEmpty(memberName)) { - return newFailedRpcResultBuilder("A valid member name must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid member name must be specified"); } LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType); @@ -183,10 +186,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl @Override - public Future> removeAllShardReplicas() { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> removeAllShardReplicas(RemoveAllShardReplicasInput input) { + LOG.info("Removing replicas for all shards"); + + final String memberName = input.getMemberName(); + if(Strings.isNullOrEmpty(memberName)) { + return newFailedRpcResultFuture("A valid member name must be specified"); + } + + final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); + Function messageSupplier = new Function() { + @Override + public Object apply(String shardName) { + return new RemoveShardReplica(shardName, memberName); + } + }; + + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + + return waitForShardResults(shardResultData, new Function, RemoveAllShardReplicasOutput>() { + @Override + public RemoveAllShardReplicasOutput apply(List shardResults) { + return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(); + } + }, "Failed to add replica"); } @Override @@ -209,7 +233,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl LOG.debug("backupDatastore: {}", input); if(Strings.isNullOrEmpty(input.getFilePath())) { - return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture(); + return newFailedRpcResultFuture("A valid file path must be specified"); } final SettableFuture> returnFuture = SettableFuture.create(); @@ -323,7 +347,8 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl private static void onMessageFailure(String msg, final SettableFuture> returnFuture, Throwable failure) { LOG.error(msg, failure); - returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder(String.format("%s: %s", msg, + failure.getMessage())).build()); } private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { @@ -345,12 +370,16 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return returnFuture; } - private static RpcResultBuilder newFailedRpcResultBuilder(String message) { + private static ListenableFuture> newFailedRpcResultFuture(String message) { + return ClusterAdminRpcService.newFailedRpcResultBuilder(message).buildFuture(); + } + + private static RpcResultBuilder newFailedRpcResultBuilder(String message) { return newFailedRpcResultBuilder(message, null); } - private static RpcResultBuilder newFailedRpcResultBuilder(String message, Throwable cause) { - return RpcResultBuilder.failed().withError(ErrorType.RPC, message, cause); + private static RpcResultBuilder newFailedRpcResultBuilder(String message, Throwable cause) { + return RpcResultBuilder.failed().withError(ErrorType.RPC, message, cause); } private static RpcResult newSuccessfulResult() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang index b88cbbba23..2d81db7117 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang @@ -39,14 +39,25 @@ module cluster-admin { } } + grouping shard-result-output { + list shard-result { + key "shard-name data-store-type"; + uses shard-operation-result; + + description "The list of results, one per shard"; + } + } + rpc add-shard-replica { input { leaf shard-name { + mandatory true; type string; description "The name of the shard for which to create a replica."; } leaf data-store-type { + mandatory true; type data-store-type; description "The type of the data store to which the replica belongs"; } @@ -62,16 +73,19 @@ module cluster-admin { rpc remove-shard-replica { input { leaf shard-name { + mandatory true; type string; description "The name of the shard for which to remove the replica."; } leaf member-name { + mandatory true; type string; description "The cluster member from which the shard replica should be removed"; } leaf data-store-type { + mandatory true; type data-store-type; description "The type of the data store to which the replica belongs"; } @@ -83,12 +97,7 @@ module cluster-admin { rpc add-replicas-for-all-shards { output { - list shard-result { - key "shard-name data-store-type"; - uses shard-operation-result; - - description "The list of results, one per shard"; - } + uses shard-result-output; } description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing @@ -96,6 +105,18 @@ module cluster-admin { } rpc remove-all-shard-replicas { + input { + leaf member-name { + mandatory true; + type string; + description "The cluster member from which the shard replicas should be removed"; + } + } + + output { + uses shard-result-output; + } + description "Removes replicas for all shards on this node. This is equivalent to issuing a remove-shard-replica for all shards and essentially removes this node from a cluster."; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 6e16dcfb09..4b7619d11a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -58,9 +58,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -457,8 +459,65 @@ public class ClusterAdminRpcServiceTest { } @Test - public void testRemoveAllShardReplicas() { - // TODO implement + public void testRemoveAllShardReplicas() throws Exception { + String name = "testRemoveAllShardReplicas"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", + "pets", null, Arrays.asList("member-1", "member-2", "member-3")); + leaderNode1.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); + leaderNode1.kit().expectMsgClass(Success.class); + + replicaNode2.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef()); + replicaNode2.kit().expectMsgClass(Success.class); + + replicaNode3.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef()); + replicaNode3.kit().expectMsgClass(Success.class); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2"); + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.removeAllShardReplicas( + new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS); + RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("pets", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2"); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1"); + verifyNoShardPresent(replicaNode3.configDataStore(), "cars"); + verifyNoShardPresent(replicaNode3.configDataStore(), "people"); + verifyNoShardPresent(replicaNode3.configDataStore(), "pets"); + + service3.close(); } @Test -- 2.36.6