import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
final String shardName = input.getShardName();
if(Strings.isNullOrEmpty(shardName)) {
- return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid shard name must be specified");
}
DataStoreType dataStoreType = input.getDataStoreType();
if(dataStoreType == null) {
- return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
}
LOG.info("Adding replica for shard {}", shardName);
public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
final String shardName = input.getShardName();
if(Strings.isNullOrEmpty(shardName)) {
- return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid shard name must be specified");
}
DataStoreType dataStoreType = input.getDataStoreType();
if(dataStoreType == null) {
- return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
}
final String memberName = input.getMemberName();
if(Strings.isNullOrEmpty(memberName)) {
- return newFailedRpcResultBuilder("A valid member name must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid member name must be specified");
}
LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
@Override
- public Future<RpcResult<Void>> removeAllShardReplicas() {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
+ public Future<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(RemoveAllShardReplicasInput input) {
+ LOG.info("Removing replicas for all shards");
+
+ final String memberName = input.getMemberName();
+ if(Strings.isNullOrEmpty(memberName)) {
+ return newFailedRpcResultFuture("A valid member name must be specified");
+ }
+
+ final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+ Function<String, Object> messageSupplier = new Function<String, Object>() {
+ @Override
+ public Object apply(String shardName) {
+ return new RemoveShardReplica(shardName, memberName);
+ }
+ };
+
+ sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+ return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+ @Override
+ public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
+ return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+ }
+ }, "Failed to add replica");
}
@Override
LOG.debug("backupDatastore: {}", input);
if(Strings.isNullOrEmpty(input.getFilePath())) {
- return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
+ return newFailedRpcResultFuture("A valid file path must be specified");
}
final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
Throwable failure) {
LOG.error(msg, failure);
- returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
+ returnFuture.set(ClusterAdminRpcService.<Void>newFailedRpcResultBuilder(String.format("%s: %s", msg,
+ failure.getMessage())).build());
}
private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
return returnFuture;
}
- private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
+ private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(String message) {
+ return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
+ }
+
+ private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message) {
return newFailedRpcResultBuilder(message, null);
}
- private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
- return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
+ private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message, Throwable cause) {
+ return RpcResultBuilder.<T>failed().withError(ErrorType.RPC, message, cause);
}
private static RpcResult<Void> newSuccessfulResult() {
}
}
+ grouping shard-result-output {
+ list shard-result {
+ key "shard-name data-store-type";
+ uses shard-operation-result;
+
+ description "The list of results, one per shard";
+ }
+ }
+
rpc add-shard-replica {
input {
leaf shard-name {
+ mandatory true;
type string;
description "The name of the shard for which to create a replica.";
}
leaf data-store-type {
+ mandatory true;
type data-store-type;
description "The type of the data store to which the replica belongs";
}
rpc remove-shard-replica {
input {
leaf shard-name {
+ mandatory true;
type string;
description "The name of the shard for which to remove the replica.";
}
leaf member-name {
+ mandatory true;
type string;
description "The cluster member from which the shard replica should be removed";
}
leaf data-store-type {
+ mandatory true;
type data-store-type;
description "The type of the data store to which the replica belongs";
}
rpc add-replicas-for-all-shards {
output {
- list shard-result {
- key "shard-name data-store-type";
- uses shard-operation-result;
-
- description "The list of results, one per shard";
- }
+ uses shard-result-output;
}
description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing
}
rpc remove-all-shard-replicas {
+ input {
+ leaf member-name {
+ mandatory true;
+ type string;
+ description "The cluster member from which the shard replicas should be removed";
+ }
+ }
+
+ output {
+ uses shard-result-output;
+ }
+
description "Removes replicas for all shards on this node. This is equivalent to issuing
a remove-shard-replica for all shards and essentially removes this node from a cluster.";
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Test
- public void testRemoveAllShardReplicas() {
- // TODO implement
+ public void testRemoveAllShardReplicas() throws Exception {
+ String name = "testRemoveAllShardReplicas";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+ ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
+ "pets", null, Arrays.asList("member-1", "member-2", "member-3"));
+ leaderNode1.configDataStore().getActorContext().getShardManager().tell(
+ new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
+ leaderNode1.kit().expectMsgClass(Success.class);
+
+ replicaNode2.configDataStore().getActorContext().getShardManager().tell(
+ new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
+ replicaNode2.kit().expectMsgClass(Success.class);
+
+ replicaNode3.configDataStore().getActorContext().getShardManager().tell(
+ new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
+ replicaNode3.kit().expectMsgClass(Success.class);
+
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
+ new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
+ RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("pets", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
+ verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
+ verifyNoShardPresent(replicaNode3.configDataStore(), "people");
+ verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+
+ service3.close();
}
@Test