From be338c9e1dab83e2a5ff21819b92b934ef32faee Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 29 Mar 2017 12:14:33 +0200 Subject: [PATCH 1/1] Bug 7806 - Implement agent RPCs for shard replica manipulation testing These can be implemented as a part of ClusterAdminRpcService instead of creating new rpcs that would be part of the lowlevel suite. Change-Id: I891f9d3703a9357e829159691cbf18f95523d529 Signed-off-by: Tomas Cere --- .../src/main/yang/cluster-admin.yang | 43 +++++ .../md-sal/sal-cluster-admin-impl/pom.xml | 6 + .../admin/ClusterAdminRpcService.java | 89 +++++++++- .../opendaylight/blueprint/cluster-admin.xml | 6 +- .../admin/ClusterAdminRpcServiceTest.java | 135 ++++++++++++-- .../messages/AddPrefixShardReplica.java | 42 +++++ .../messages/RemovePrefixShardReplica.java | 48 +++++ .../datastore/shardmanager/ShardManager.java | 164 +++++++++++++++++- 8 files changed, 509 insertions(+), 24 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemovePrefixShardReplica.java diff --git a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang index 40fb5fbe71..8a3c58a162 100644 --- a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang @@ -109,6 +109,49 @@ module cluster-admin { described in the Raft paper."; } + rpc add-prefix-shard-replica { + input { + leaf shard-prefix { + mandatory true; + type instance-identifier; + } + + leaf data-store-type { + mandatory true; + type data-store-type; + description "The type of the data store to which the replica belongs"; + } + } + + description "Adds a replica of a shard to this node and joins it to an existing cluster. There must already be + a shard existing on another node with a leader. This RPC first contacts peer member seed nodes + searching for a shard. When found, an AddServer message is sent to the shard leader and applied as + described in the Raft paper."; + } + + rpc remove-prefix-shard-replica { + input { + leaf shard-prefix { + mandatory true; + type instance-identifier; + } + leaf member-name { + mandatory true; + type string; + description "The cluster member from which the shard replica should be removed"; + } + + leaf data-store-type { + mandatory true; + type data-store-type; + description "The type of the data store to which the replica belongs"; + } + } + + description "Removes an existing replica of a prefix shard from this node via the RemoveServer mechanism as + described in the Raft paper."; + } + rpc add-replicas-for-all-shards { output { uses shard-result-output; diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml b/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml index 2049629097..133507446b 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml +++ b/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml @@ -50,6 +50,12 @@ org.opendaylight.yangtools yang-test-util + + org.opendaylight.controller.samples + clustering-it-model + 1.5.0-SNAPSHOT + test + diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index d51753c3ea..3627bd80fa 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -33,14 +33,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; @@ -56,13 +60,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,11 +85,14 @@ public class ClusterAdminRpcService implements ClusterAdminService { private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; + private final BindingNormalizedNodeSerializer serializer; public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, - DistributedDataStoreInterface operDataStore) { + DistributedDataStoreInterface operDataStore, + BindingNormalizedNodeSerializer serializer) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; + this.serializer = serializer; } @Override @@ -157,6 +167,83 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + @Override + public Future> addPrefixShardReplica(final AddPrefixShardReplicaInput input) { + + final InstanceIdentifier identifier = input.getShardPrefix(); + if (identifier == null) { + return newFailedRpcResultFuture("A valid shard identifier must be specified"); + } + + final DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType); + + final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Success success) { + LOG.info("Successfully added replica for shard {}", prefix); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", prefix), + returnFuture, failure); + } + }); + + return returnFuture; + } + + @Override + public Future> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) { + + final InstanceIdentifier identifier = input.getShardPrefix(); + if (identifier == null) { + return newFailedRpcResultFuture("A valid shard identifier must be specified"); + } + + final DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + final String memberName = input.getMemberName(); + if (Strings.isNullOrEmpty(memberName)) { + return newFailedRpcResultFuture("A valid member name must be specified"); + } + + LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", + identifier, memberName, dataStoreType); + final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); + + final SettableFuture> returnFuture = SettableFuture.create(); + final ListenableFuture future = sendMessageToShardManager(dataStoreType, + new RemovePrefixShardReplica(prefix, MemberName.forName(memberName))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Success success) { + LOG.info("Successfully removed replica for shard {}", prefix); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(final Throwable failure) { + onMessageFailure(String.format("Failed to remove replica for shard %s", prefix), + returnFuture, failure); + } + }); + + return returnFuture; + } + @Override public Future> addReplicasForAllShards() { LOG.info("Adding replicas for all shards"); diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/resources/org/opendaylight/blueprint/cluster-admin.xml b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/resources/org/opendaylight/blueprint/cluster-admin.xml index 85edd2ef0b..258bc09955 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/resources/org/opendaylight/blueprint/cluster-admin.xml +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/resources/org/opendaylight/blueprint/cluster-admin.xml @@ -1,6 +1,7 @@ + xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0" + odl:use-default-for-reference-types="true"> @@ -10,9 +11,12 @@ + + + diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index dee0e98442..5cca07428f 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -38,6 +38,7 @@ import java.net.URI; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -49,6 +50,7 @@ import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -56,9 +58,11 @@ import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -66,9 +70,16 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; @@ -79,10 +90,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -121,7 +135,7 @@ public class ClusterAdminRpcServiceTest { String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore()); + ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); @@ -166,6 +180,58 @@ public class ClusterAdminRpcServiceTest { assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames); } + @Test + public void testAddRemovePrefixShardReplica() throws Exception { + String name = "testAddPrefixShardReplica"; + String moduleShardsConfig = "module-shards-default.conf"; + + final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + + member1.waitForMembersUp("member-2", "member-3"); + replicaNode2.kit().waitForMembersUp("member-1", "member-3"); + replicaNode3.kit().waitForMembersUp("member-1", "member-2"); + + final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager(); + + shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), + "prefix", Collections.singleton(MEMBER_1))), + ActorRef.noSender()); + + member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); + + final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); + final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); + Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); + + addPrefixShardReplica(replicaNode2, identifier, serializer, + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1"); + + addPrefixShardReplica(replicaNode3, identifier, serializer, + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2"); + + verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), + "member-2", "member-3"); + + removePrefixShardReplica(member1, identifier, "member-3", serializer, + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2"); + + verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); + verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), + "member-1"); + + removePrefixShardReplica(member1, identifier, "member-2", serializer, + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); + + verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); + } + @Test public void testAddShardReplica() throws Exception { String name = "testAddShardReplica"; @@ -234,7 +300,7 @@ public class ClusterAdminRpcServiceTest { .moduleShardsConfig("module-shards-cars-member-1.conf").build(); ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore()); + memberNode.operDataStore(), null); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -273,12 +339,53 @@ public class ClusterAdminRpcServiceTest { assertEquals("Data node", expCarsNode, optional.get()); } + private void addPrefixShardReplica(final MemberNode memberNode, + final InstanceIdentifier identifier, + final BindingNormalizedNodeSerializer serializer, + final String shardName, + final String... peerMemberNames) throws Exception { + + final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder() + .setShardPrefix(identifier) + .setDataStoreType(DataStoreType.Config).build(); + + final ClusterAdminRpcService service = + new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); + + final RpcResult rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + + verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); + Optional optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName); + assertTrue("Replica shard not present", optional.isPresent()); + } + + private void removePrefixShardReplica(final MemberNode memberNode, + final InstanceIdentifier identifier, + final String removeFromMember, + final BindingNormalizedNodeSerializer serializer, + final String shardName, + final String... peerMemberNames) throws Exception { + final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder() + .setDataStoreType(DataStoreType.Config) + .setShardPrefix(identifier) + .setMemberName(removeFromMember).build(); + + final ClusterAdminRpcService service = + new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); + + final RpcResult rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + + verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); + } + private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore()); + memberNode.operDataStore(), null); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName) .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -340,7 +447,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to remove it's local shard ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore()); + replicaNode3.operDataStore(), null); RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()) @@ -365,7 +472,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-1 to remove member-2 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore()); + leaderNode1.operDataStore(), null); rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars") .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -401,7 +508,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on leader member-1 to remove it's local shard ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore()); + leaderNode1.operDataStore(), null); RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()) @@ -448,7 +555,7 @@ public class ClusterAdminRpcServiceTest { newReplicaNode2.kit().expectMsgClass(Success.class); ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), - newReplicaNode2.operDataStore()); + newReplicaNode2.operDataStore(), null); RpcResult rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS); @@ -506,7 +613,7 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2"); ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore()); + replicaNode3.operDataStore(), null); RpcResult rpcResult = service3.removeAllShardReplicas( new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS); @@ -552,7 +659,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore()); + replicaNode3.operDataStore(), null); RpcResult rpcResult = service3 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() @@ -586,7 +693,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), - leaderNode.operDataStore()); + leaderNode.operDataStore(), null); RpcResult rpcResult = service .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() @@ -626,7 +733,7 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore()); + replicaNode3.operDataStore(), null); RpcResult rpcResult = service3.changeMemberVotingStatesForAllShards( new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of( @@ -678,7 +785,7 @@ public class ClusterAdminRpcServiceTest { new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE)); ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore()); + replicaNode3.operDataStore(), null); RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards() .get(10, TimeUnit.SECONDS); @@ -772,7 +879,7 @@ public class ClusterAdminRpcServiceTest { assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState())); ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), - replicaNode1.operDataStore()); + replicaNode1.operDataStore(), null); RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() .get(10, TimeUnit.SECONDS); @@ -839,7 +946,7 @@ public class ClusterAdminRpcServiceTest { new SimpleEntry<>("member-6", FALSE)); ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore()); + leaderNode1.operDataStore(), null); RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() .get(10, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java new file mode 100644 index 0000000000..05e1271dd0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * A message sent to the ShardManager to dynamically add a new local shard + * that is a replica for an existing prefix shard that is already available + * in the cluster. + */ +public class AddPrefixShardReplica { + + private final YangInstanceIdentifier prefix; + + /** + * Constructor. + * + * @param prefix prefix of the shard that is to be locally replicated. + */ + + public AddPrefixShardReplica(@Nonnull final YangInstanceIdentifier prefix) { + this.prefix = Preconditions.checkNotNull(prefix, "prefix should not be null"); + } + + public YangInstanceIdentifier getShardPrefix() { + return this.prefix; + } + + @Override + public String toString() { + return "AddPrefixShardReplica[prefix=" + prefix + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemovePrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemovePrefixShardReplica.java new file mode 100644 index 0000000000..9c33cf0810 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemovePrefixShardReplica.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * A message sent to the ShardManager to dynamically remove a local prefix shard + * replica available in this node. + */ +public class RemovePrefixShardReplica { + + private final YangInstanceIdentifier prefix; + private final MemberName memberName; + + /** + * Constructor. + * + * @param prefix prefix of the local shard that is to be dynamically removed. + */ + public RemovePrefixShardReplica(@Nonnull final YangInstanceIdentifier prefix, + @Nonnull final MemberName memberName) { + this.prefix = Preconditions.checkNotNull(prefix, "prefix should not be null"); + this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null"); + } + + public YangInstanceIdentifier getShardPrefix() { + return prefix; + } + + public MemberName getMemberName() { + return memberName; + } + + @Override + public String toString() { + return "RemovePrefixShardReplica [prefix=" + prefix + ", memberName=" + memberName + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 5bfa1851ee..4ae1540776 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -64,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -75,6 +76,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -108,6 +110,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,6 +252,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); + } else if (message instanceof AddPrefixShardReplica) { + onAddPrefixShardReplica((AddPrefixShardReplica) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); } else if (message instanceof PrefixShardRemoved) { @@ -264,6 +269,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); + } else if (message instanceof RemovePrefixShardReplica) { + onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { @@ -374,6 +381,46 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, + final String primaryPath, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -486,8 +533,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - config.getPrefix()); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); final String shardName = shardId.getShardName(); if (localShards.containsKey(shardName)) { @@ -532,7 +579,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); final ShardInformation shard = localShards.remove(shardId.getShardName()); configuration.removePrefixShardConfiguration(prefix); @@ -1188,6 +1236,37 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { + LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); + + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + + // Create the localShard + if (schemaContext == null) { + String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug("{}: {}", persistenceId(), msg); + getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), + getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(), + response, getSender()), getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); + } + }); + } + private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); @@ -1232,6 +1311,39 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); } + private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, + final RemotePrimaryShardFound response, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardInformation shardInfo; + final boolean removeShardOnFailure; + ShardInformation existingShardInfo = localShards.get(shardName); + if (existingShardInfo == null) { + removeShardOnFailure = true; + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + DatastoreContext datastoreContext = builder.build(); + + shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + } else { + removeShardOnFailure = false; + shardInfo = existingShardInfo; + } + + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1259,16 +1371,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = existingShardInfo; } - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + + private void execAddShard(final String shardName, + final ShardInformation shardInfo, + final RemotePrimaryShardFound response, + final boolean removeShardOnFailure, + final ActorRef sender) { + + final String localShardAddress = + peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); //inform ShardLeader to add this shard as a replica by sending an AddServer message LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), response.getPrimaryPath(), shardInfo.getShardId()); - Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() + final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() .getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), - new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); + final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete() { @Override @@ -1277,7 +1399,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - String msg = String.format("AddServer request to leader %s for shard %s failed", + final String msg = String.format("AddServer request to leader %s for shard %s failed", response.getPrimaryPath(), shardName); self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { @@ -1379,6 +1501,32 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } + private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { + LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); + + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), + shardName, persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), + primaryPath, getSender()), getTargetActor()); + } + }); + } + private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { -- 2.36.6