From: Tom Pantelis Date: Thu, 3 Dec 2015 14:47:27 +0000 (-0500) Subject: Bug 2187: Add datastoreType to add-shard-replica RPC X-Git-Tag: release/beryllium~102 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=0b5ba62b192d1f1e1c5be6e884f2bacb40a5c2c3 Bug 2187: Add datastoreType to add-shard-replica RPC Change-Id: I4d24063d9fac5f23d207f621879c4369819c0bd5 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index ba84296140..007b347558 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -34,6 +34,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -78,13 +79,18 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); } + DataStoreType dataStoreType = input.getDataStoreType(); + if(dataStoreType == null) { + return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture(); + } + LOG.info("Adding replica for shard {}", shardName); final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture> future = sendMessageToShardManagers(new AddShardReplica(shardName)); - Futures.addCallback(future, new FutureCallback>() { + ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)); + Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(List snapshots) { + public void onSuccess(Success success) { LOG.info("Successfully added replica for shard {}", shardName); returnFuture.set(newSuccessfulResult()); } @@ -169,6 +175,12 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return Futures.allAsList(configFuture, operFuture); } + private ListenableFuture sendMessageToShardManager(DataStoreType dataStoreType, Object message) { + ActorRef shardManager = dataStoreType == DataStoreType.Config ? + configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager(); + return ask(shardManager, message, new Timeout(1, TimeUnit.MINUTES)); + } + private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, SettableFuture> returnFuture) { try(FileOutputStream fos = new FileOutputStream(fileName)) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang index 967e5554df..940741b720 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang @@ -12,20 +12,25 @@ module cluster-admin { typedef data-store-type { type enumeration { - enum config { - value 1; - } - enum operational { - value 2; - } + enum config { + value 1; + } + enum operational { + value 2; + } } } rpc add-shard-replica { input { leaf shard-name { - type string; - description "The name of the shard for which to create a replica."; + type string; + description "The name of the shard for which to create a replica."; + } + + leaf data-store-type { + type data-store-type; + description "The type of the data store to which the replica belongs"; } } @@ -39,18 +44,18 @@ module cluster-admin { rpc remove-shard-replica { input { leaf shard-name { - type string; - description "The name of the shard for which to remove the replica."; + type string; + description "The name of the shard for which to remove the replica."; } leaf member-name { - type string; - description "The cluster member from which the shard replica should be removed"; + type string; + description "The cluster member from which the shard replica should be removed"; } leaf data-store-type { - type data-store-type; - description "The type of the data store to which the replica belongs"; + type data-store-type; + description "The type of the data store to which the replica belongs"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index a85f651823..765d8a1033 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -42,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -80,7 +81,7 @@ public class ClusterAdminRpcServiceTest { RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder(). setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); - checkSuccessfulRpcResult(rpcResult); + verifySuccessfulRpcResult(rpcResult); try(FileInputStream fis = new FileInputStream(fileName)) { List snapshots = SerializationUtils.deserialize(fis); @@ -134,7 +135,7 @@ public class ClusterAdminRpcServiceTest { leaderNode1.waitForMembersUp("member-2"); - testAddShardReplica(newReplicaNode2, "cars", "member-1"); + doAddShardReplica(newReplicaNode2, "cars", "member-1"); MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). moduleShardsConfig(moduleShardsConfig).build(); @@ -142,7 +143,7 @@ public class ClusterAdminRpcServiceTest { leaderNode1.waitForMembersUp("member-3"); newReplicaNode2.waitForMembersUp("member-3"); - testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); + doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3"); verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3"); @@ -186,6 +187,30 @@ public class ClusterAdminRpcServiceTest { readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode); } + @Test + public void testAddShardReplicaFailures() throws Exception { + String name = "testAddShardReplicaFailures"; + MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name). + moduleShardsConfig("module-shards-cars-member-1.conf").build(); + + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore()); + + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder(). + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars"). + build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people"). + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + service.close(); + } + private NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, DistributedDataStore readFromStore) throws Exception { DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); @@ -210,7 +235,7 @@ public class ClusterAdminRpcServiceTest { assertEquals("Data node", expCarsNode, optional.get()); } - private void testAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) + private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); @@ -218,16 +243,24 @@ public class ClusterAdminRpcServiceTest { memberNode.operDataStore()); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). - build()).get(10, TimeUnit.SECONDS); - checkSuccessfulRpcResult(rpcResult); + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); + + Optional optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName); + assertEquals("Oper shard present", false, optional.isPresent()); + + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). + setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); service.close(); } - private void checkSuccessfulRpcResult(RpcResult rpcResult) { + private void verifySuccessfulRpcResult(RpcResult rpcResult) { if(!rpcResult.isSuccessful()) { if(rpcResult.getErrors().size() > 0) { RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -238,6 +271,13 @@ public class ClusterAdminRpcServiceTest { } } + private void verifyFailedRpcResult(RpcResult rpcResult) { + assertEquals("RpcResult", false, rpcResult.isSuccessful()); + assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size()); + RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + assertNotNull("RpcResult error message null", error.getMessage()); + } + @Test public void testRemoveShardReplica() { // TODO implement