From 5f587c3e2bfabc09fec49463d04a6fbeba414e9c Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 14 Feb 2021 10:01:42 +0100 Subject: [PATCH 1/1] Remove DOMDataTreeProducer-related classes DOMDataTreeProducer is being removed by upstream, remove our constructs relying on it. JIRA: CONTROLLER-1977 Change-Id: Icbbcdd41ac0df3ee916538f18908edda21f23cd8 Signed-off-by: Robert Varga --- .../cluster/dom/api/CDSDataTreeProducer.java | 41 - .../cluster/dom/api/CDSShardAccess.java | 74 -- .../admin/ClusterAdminRpcServiceTest.java | 194 ---- .../actors/dds/ClientTransaction.java | 19 - .../actors/dds/ClientTransactionCursor.java | 81 -- .../datastore/shardmanager/ShardManager.java | 96 +- .../cluster/sharding/CDSShardAccessImpl.java | 214 ----- ...MDataTreeShardCreationFailedException.java | 28 - .../DistributedShardChangePublisher.java | 320 ------- .../sharding/DistributedShardFactory.java | 40 - .../sharding/DistributedShardFrontend.java | 141 --- .../DistributedShardModification.java | 74 -- .../DistributedShardModificationContext.java | 60 -- .../DistributedShardModificationCursor.java | 37 - .../DistributedShardModificationFactory.java | 52 -- ...ibutedShardModificationFactoryBuilder.java | 29 - .../DistributedShardRegistration.java | 26 - .../DistributedShardedDOMDataTree.java | 696 --------------- .../cluster/sharding/LookupTask.java | 53 -- .../OSGiDistributedShardedDOMDataTree.java | 93 -- .../PrefixedShardConfigUpdateHandler.java | 188 ---- .../sharding/PrefixedShardConfigWriter.java | 178 ---- .../sharding/RoleChangeListenerActor.java | 72 -- .../cluster/sharding/ShardProxyProducer.java | 63 -- .../sharding/ShardProxyTransaction.java | 215 ----- .../sharding/ShardedDataTreeActor.java | 829 ------------------ .../ShardingServiceAddressResolver.java | 69 -- .../sharding/messages/InitConfigListener.java | 23 - .../sharding/messages/LookupPrefixShard.java | 45 - .../messages/NotifyProducerCreated.java | 35 - .../messages/NotifyProducerRemoved.java | 35 - .../sharding/messages/PrefixShardCreated.java | 42 - .../messages/PrefixShardRemovalLookup.java | 35 - .../sharding/messages/PrefixShardRemoved.java | 35 - .../sharding/messages/ProducerCreated.java | 32 - .../sharding/messages/ProducerRemoved.java | 33 - .../messages/StartConfigShardLookup.java | 34 - .../dds/ClientTransactionCursorTest.java | 131 --- .../actors/dds/ClientTransactionTest.java | 16 - .../sharding/CDSShardAccessImplTest.java | 218 ----- .../DistributedShardFrontendTest.java | 213 ----- ...ributedShardedDOMDataTreeRemotingTest.java | 437 --------- .../DistributedShardedDOMDataTreeTest.java | 551 ------------ .../sharding/RoleChangeListenerActorTest.java | 58 -- .../provider/MdsalLowLevelTestProvider.java | 145 +-- .../impl/IdIntsDOMDataTreeLIstener.java | 110 --- .../it/provider/impl/PrefixLeaderHandler.java | 74 -- .../it/provider/impl/PrefixShardHandler.java | 211 ----- .../impl/ProduceTransactionsHandler.java | 163 ---- .../OSGI-INF/blueprint/cluster-test-app.xml | 4 - 50 files changed, 10 insertions(+), 6652 deletions(-) delete mode 100644 opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSDataTreeProducer.java delete mode 100644 opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSShardAccess.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCursor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardRegistration.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCursorTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java delete mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java delete mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixLeaderHandler.java delete mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java delete mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java diff --git a/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSDataTreeProducer.java b/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSDataTreeProducer.java deleted file mode 100644 index 6ac2f920f6..0000000000 --- a/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSDataTreeProducer.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.dom.api; - -import com.google.common.annotations.Beta; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; - -/** - * An extension to {@link DOMDataTreeProducer}, which allows users access - * to information about the backing shard. - * - * @author Robert Varga - */ -@Beta -@Deprecated(forRemoval = true) -public interface CDSDataTreeProducer extends DOMDataTreeProducer { - /** - * Return a {@link CDSShardAccess} handle. This handle will remain valid - * as long as this producer is operational. Returned handle can be accessed - * independently from this producer and is not subject to the usual access - * restrictions imposed on DOMDataTreeProducer state. - * - * @param subtree One of the subtrees to which are currently under control of this producer - * @return A shard access handle. - * @throws NullPointerException when subtree is null - * @throws IllegalArgumentException if the specified subtree is not controlled by this producer - * @throws IllegalStateException if this producer is no longer operational - * @throws IllegalThreadStateException if the access rules to this producer - * are violated, for example if this producer is bound and this thread - * is currently not executing from a listener context. - */ - @NonNull CDSShardAccess getShardAccess(@NonNull DOMDataTreeIdentifier subtree); -} - diff --git a/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSShardAccess.java b/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSShardAccess.java deleted file mode 100644 index 12303935e0..0000000000 --- a/opendaylight/md-sal/cds-dom-api/src/main/java/org/opendaylight/controller/cluster/dom/api/CDSShardAccess.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.dom.api; - -import com.google.common.annotations.Beta; -import java.util.concurrent.CompletionStage; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Unprivileged access interface to shard information. Provides read-only access to operational details about a CDS - * shard. - * - * @author Robert Varga - */ -@Beta -@Deprecated(forRemoval = true) -public interface CDSShardAccess { - /** - * Return the shard identifier. - * - * @return Shard identifier. - * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated - * {@link CDSDataTreeProducer} is no longer valid. - */ - @NonNull DOMDataTreeIdentifier getShardIdentifier(); - - /** - * Return the shard leader location relative to the local node. - * - * @return Shard leader location. - * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated - * {@link CDSDataTreeProducer} is no longer valid. - */ - @NonNull LeaderLocation getLeaderLocation(); - - /** - * Request the shard leader to be moved to the local node. The request will be evaluated against shard state and - * satisfied if leader movement is possible. If current shard policy or state prevents the movement from happening, - * the returned {@link CompletionStage} will report an exception. - * - *

- * This is a one-time operation, which does not prevent further movement happening in future. Even if this request - * succeeds, there is no guarantee that the leader will remain local in face of failures, shutdown or any future - * movement requests from other nodes. - * - *

- * Note that due to asynchronous nature of CDS, the leader may no longer be local by the time the returned - * {@link CompletionStage} reports success. - * - * @return A {@link CompletionStage} representing the request. - * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated - * {@link CDSDataTreeProducer} is no longer valid. - */ - @NonNull CompletionStage makeLeaderLocal(); - - /** - * Register a listener to shard location changes. Each listener object can be registered at most once. - * - * @param listener Listener object - * @return A {@link LeaderLocationListenerRegistration} for the listener. - * @throws IllegalArgumentException if the specified listener is already registered. - * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated - * {@link CDSDataTreeProducer} is no longer valid. - * @throws NullPointerException if listener is null. - */ - @NonNull LeaderLocationListenerRegistration registerLeaderLocationListener( - @NonNull L listener); -} diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 9703ebc487..1362db752e 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -50,7 +50,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -58,11 +57,9 @@ import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; @@ -70,19 +67,9 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; -import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; @@ -96,26 +83,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -199,107 +176,6 @@ public class ClusterAdminRpcServiceTest { assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames); } - @Test - public void testAddRemovePrefixShardReplica() throws Exception { - String name = "testAddPrefixShardReplica"; - String moduleShardsConfig = "module-shards-default.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.waitForMembersUp("member-2", "member-3"); - replicaNode2.kit().waitForMembersUp("member-1", "member-3"); - replicaNode3.kit().waitForMembersUp("member-1", "member-2"); - - final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - addPrefixShardReplica(replicaNode2, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1"); - - addPrefixShardReplica(replicaNode3, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2"); - - verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-2", "member-3"); - - removePrefixShardReplica(member1, identifier, "member-3", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2"); - - verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-1"); - - removePrefixShardReplica(member1, identifier, "member-2", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - } - - @Test - public void testGetShardRole() throws Exception { - String name = "testGetShardRole"; - String moduleShardsConfig = "module-shards-default-member-1.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default"); - - final RpcResult successResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default"); - verifySuccessfulRpcResult(successResult); - assertEquals("Leader", successResult.getResult().getRole()); - - final RpcResult failedResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars"); - - verifyFailedRpcResult(failedResult); - - final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - final RpcResult prefixSuccessResult = - getPrefixShardRole(member1, identifier, serializer); - - verifySuccessfulRpcResult(prefixSuccessResult); - assertEquals("Leader", prefixSuccessResult.getResult().getRole()); - - final InstanceIdentifier peopleId = InstanceIdentifier.create(People.class); - Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId); - - final RpcResult prefixFail = - getPrefixShardRole(member1, peopleId, serializer); - - verifyFailedRpcResult(prefixFail); - } - @Test public void testGetPrefixShardRole() throws Exception { String name = "testGetPrefixShardRole"; @@ -309,8 +185,6 @@ public class ClusterAdminRpcServiceTest { .moduleShardsConfig(moduleShardsConfig).build(); member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default"); - - } @Test @@ -461,74 +335,6 @@ public class ClusterAdminRpcServiceTest { assertEquals("Data node", expCarsNode, optional.get()); } - private static RpcResult getShardRole(final MemberNode memberNode, - final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception { - - final GetShardRoleInput input = new GetShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardName(shardName) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getShardRole(input).get(10, TimeUnit.SECONDS); - } - - private static RpcResult getPrefixShardRole( - final MemberNode memberNode, - final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer) throws Exception { - - final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS); - } - - private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer, final String shardName, - final String... peerMemberNames) throws Exception { - - final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder() - .setShardPrefix(identifier) - .setDataStoreType(DataStoreType.Config).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.addPrefixShardReplica(input) - .get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - Optional optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName); - assertTrue("Replica shard not present", optional.isPresent()); - } - - private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, - final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName, - final String... peerMemberNames) throws Exception { - final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .setMemberName(removeFromMember).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.removePrefixShardReplica(input) - .get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - } - private static void doAddShardReplica(final MemberNode memberNode, final String shardName, final String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 5a28e692aa..794a1be960 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -13,10 +13,7 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FluentFuture; import java.util.Collection; import java.util.Optional; -import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -53,9 +50,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; */ @Beta public class ClientTransaction extends AbstractClientHandle { - - private ClientTransactionCursor cursor; - ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { super(parent, transactionId); } @@ -64,13 +58,6 @@ public class ClientTransaction extends AbstractClientHandle exists(final YangInstanceIdentifier path) { return ensureTransactionProxy(path).exists(path); } @@ -117,10 +104,4 @@ public class ClientTransaction extends AbstractClientHandle path) { - path.forEach(this::enter); - } - - @Override - public void exit() { - final YangInstanceIdentifier currentParent = current.getParent(); - checkState(currentParent != null); - current = currentParent; - } - - @Override - public void exit(final int depth) { - for (int i = 0; i < depth; ++i) { - exit(); - } - } - - @Override - public void close() { - parent.closeCursor(this); - } - - @Override - public void delete(final PathArgument child) { - parent.delete(current.node(child)); - } - - @Override - public void merge(final PathArgument child, final NormalizedNode data) { - parent.merge(current.node(child), data); - } - - @Override - public void write(final PathArgument child, final NormalizedNode data) { - parent.write(current.node(child), data); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 52f642c98c..18bcadf151 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -53,7 +53,6 @@ import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.common.actor.Dispatchers; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -109,10 +108,6 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -176,9 +171,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set> shardAvailabilityCallbacks = new HashSet<>(); private final String persistenceId; - private final AbstractDataStore dataStore; - - private PrefixedShardConfigUpdateHandler configUpdateHandler; ShardManager(final AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); @@ -203,8 +195,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "shard-manager-" + this.type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - - dataStore = builder.getDistributedDataStore(); } @Override @@ -259,12 +249,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddShardReplica((AddShardReplica) message); } else if (message instanceof AddPrefixShardReplica) { onAddPrefixShardReplica((AddPrefixShardReplica) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof InitConfigListener) { - onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -342,21 +326,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); } - private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - - final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = - org.opendaylight.mdsal.common.api.LogicalDatastoreType - .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); - - if (configUpdateHandler != null) { - configUpdateHandler.close(); - } - - configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, datastoreType); - } - void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { @@ -616,32 +585,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); - final String shardName = shardId.getShardName(); - - if (isPreviousShardActorStopInProgress(shardName, message)) { - return; - } - - if (localShards.containsKey(shardName)) { - LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); - final PrefixShardConfiguration existing = - configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - - if (existing != null && existing.equals(config)) { - // we don't have to do nothing here - return; - } - } - - doCreatePrefixShard(config, shardId, shardName); - } - + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -662,43 +607,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, - final String shardName) { - configuration.addPrefixShardConfiguration(config); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(config.getPrefix().getDatastoreType()) - .storeRoot(config.getPrefix().getRootIdentifier()); - DatastoreContext shardDatastoreContext = builder.build(); - - final Map peerAddresses = getPeerAddresses(shardName); - final boolean isActiveMember = true; - - LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); - - final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, Shard.builder(), peerAddressResolver); - info.setActiveMember(isActiveMember); - localShards.put(info.getShardName(), info); - - if (schemaContext != null) { - info.setSchemaContext(schemaContext); - info.setActor(newShardActor(info)); - } - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - configuration.removePrefixShardConfiguration(prefix); - removeShard(shardId); - } - private void doCreateShard(final CreateShard createShard) { final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); final String shardName = moduleShardConfig.getShardName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java deleted file mode 100644 index 0e84d046d2..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static akka.actor.ActorRef.noSender; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -import akka.actor.ActorRef; -import akka.actor.PoisonPill; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; -import akka.util.Timeout; -import java.util.Collection; -import java.util.Optional; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; -import org.opendaylight.controller.cluster.dom.api.LeaderLocation; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration; -import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.compat.java8.FutureConverters; -import scala.concurrent.Future; - -/** - * Default {@link CDSShardAccess} implementation. Listens on leader location - * change events and distributes them to registered listeners. Also updates - * current information about leader location accordingly. - * - *

- * Sends {@link MakeLeaderLocal} message to local shards and translates its result - * on behalf users {@link #makeLeaderLocal()} calls. - * - *

- * {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that - * creates instances of this class has to call {@link #close()} once it is no - * longer valid. - */ -@Deprecated(forRemoval = true) -final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class); - - private final Collection listeners = ConcurrentHashMap.newKeySet(); - private final DOMDataTreeIdentifier prefix; - private final ActorUtils actorUtils; - private final Timeout makeLeaderLocalTimeout; - - private ActorRef roleChangeListenerActor; - - private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN; - private volatile boolean closed = false; - - CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorUtils actorUtils) { - this.prefix = requireNonNull(prefix); - this.actorUtils = requireNonNull(actorUtils); - this.makeLeaderLocalTimeout = - new Timeout(actorUtils.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2)); - - // register RoleChangeListenerActor - // TODO Maybe we should do this in async - final Optional localShardReply = - actorUtils.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - checkState(localShardReply.isPresent(), - "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix); - roleChangeListenerActor = - actorUtils.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this)); - } - - private void checkNotClosed() { - checkState(!closed, "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid"); - } - - @Override - public DOMDataTreeIdentifier getShardIdentifier() { - checkNotClosed(); - return prefix; - } - - @Override - public LeaderLocation getLeaderLocation() { - checkNotClosed(); - // TODO before getting first notification from roleChangeListenerActor - // we will always return UNKNOWN - return currentLeader; - } - - @Override - public CompletionStage makeLeaderLocal() { - // TODO when we have running make leader local operation - // we should just return the same completion stage - checkNotClosed(); - - // TODO can we cache local shard actorRef? - final Future localShardReply = - actorUtils.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - // we have to tell local shard to make leader local - final scala.concurrent.Promise makeLeaderLocalAsk = Futures.promise(); - localShardReply.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final ActorRef actorRef) { - if (failure instanceof LocalShardNotFoundException) { - LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.", - getShardIdentifier(), failure); - makeLeaderLocalAsk.failure(failure); - } else if (failure != null) { - // TODO should this be WARN? - LOG.debug("Failed to find local shard for {} - Cannot request leadership transfer to local shard.", - getShardIdentifier(), failure); - makeLeaderLocalAsk.failure(failure); - } else { - makeLeaderLocalAsk - .completeWith(actorUtils - .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); - } - } - }, actorUtils.getClientDispatcher()); - - // we have to transform make leader local request result - Future makeLeaderLocalFuture = makeLeaderLocalAsk.future() - .transform(new Mapper() { - @Override - public Void apply(final Object parameter) { - return null; - } - }, new Mapper() { - @Override - public Throwable apply(final Throwable parameter) { - if (parameter instanceof LeadershipTransferFailedException) { - // do nothing with exception and just pass it as it is - return parameter; - } - // wrap exception in LeadershipTransferFailedEx - return new LeadershipTransferFailedException("Leadership transfer failed", parameter); - } - }, actorUtils.getClientDispatcher()); - - return FutureConverters.toJava(makeLeaderLocalFuture); - } - - @Override - public LeaderLocationListenerRegistration - registerLeaderLocationListener(final L listener) { - checkNotClosed(); - requireNonNull(listener); - checkArgument(!listeners.contains(listener), "Listener %s is already registered with ShardAccess %s", listener, - this); - - LOG.debug("Registering LeaderLocationListener {}", listener); - - listeners.add(listener); - - return new LeaderLocationListenerRegistration<>() { - @Override - public L getInstance() { - return listener; - } - - @Override - public void close() { - listeners.remove(listener); - } - }; - } - - @Override - @SuppressWarnings("checkstyle:IllegalCatch") - public void onLeaderLocationChanged(final LeaderLocation location) { - if (closed) { - // we are closed already. Do not dispatch any new leader location - // change events. - return; - } - - LOG.debug("Received leader location change notification. New leader location: {}", location); - currentLeader = location; - listeners.forEach(listener -> { - try { - listener.onLeaderLocationChanged(location); - } catch (Exception e) { - LOG.warn("Ignoring uncaught exception thrown be LeaderLocationListener {} " - + "during processing leader location change {}", listener, location, e); - } - }); - } - - @Override - public void close() { - // TODO should we also remove all listeners? - LOG.debug("Closing {} ShardAccess", prefix); - closed = true; - - if (roleChangeListenerActor != null) { - // stop RoleChangeListenerActor - roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender()); - roleChangeListenerActor = null; - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java deleted file mode 100644 index f69bf06661..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.annotations.Beta; -import org.eclipse.jdt.annotation.NonNull; - -/** - * Exception thrown when there was a at any point during the creation of a shard via {@link DistributedShardFactory}. - */ -@Beta -@Deprecated(forRemoval = true) -public class DOMDataTreeShardCreationFailedException extends Exception { - private static final long serialVersionUID = 1L; - - public DOMDataTreeShardCreationFailedException(final @NonNull String message) { - super(message); - } - - public DOMDataTreeShardCreationFailedException(final @NonNull String message, final @NonNull Throwable cause) { - super(message, cause); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java deleted file mode 100644 index db8e1edebd..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import org.checkerframework.checker.lock.qual.GuardedBy; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; -import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree; -import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode; -import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class DistributedShardChangePublisher - extends AbstractRegistrationTree> - implements DOMStoreTreeChangePublisher { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class); - - private final DistributedDataStoreInterface distributedDataStore; - private final YangInstanceIdentifier shardPath; - - private final Map childShards; - - @GuardedBy("this") - private final DataTree dataTree; - - public DistributedShardChangePublisher(final DataStoreClient client, - final DistributedDataStoreInterface distributedDataStore, - final DOMDataTreeIdentifier prefix, - final Map childShards) { - this.distributedDataStore = distributedDataStore; - // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea - // maybe the whole listener logic would be better in the backend shards where we have direct access to the - // dataTree and wont have to cache it redundantly. - - final DataTreeConfiguration baseConfig; - switch (prefix.getDatastoreType()) { - case CONFIGURATION: - baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION; - break; - case OPERATIONAL: - baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL; - break; - default: - throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType()); - } - - this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) - .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) - .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) - .setRootPath(prefix.getRootIdentifier()) - .build()); - - // XXX: can we guarantee that the root is present in the schemacontext? - this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext()); - this.shardPath = prefix.getRootIdentifier(); - this.childShards = childShards; - } - - protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { - LOG.debug("Closing registration {}", registration); - } - - @Override - public AbstractDOMDataTreeChangeListenerRegistration - registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) { - takeLock(); - try { - return setupListenerContext(path, listener); - } finally { - releaseLock(); - } - } - - private AbstractDOMDataTreeChangeListenerRegistration - setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) { - // we need to register the listener registration path based on the shards root - // we have to strip the shard path from the listener path and then register - YangInstanceIdentifier strippedIdentifier = listenerPath; - if (!shardPath.isEmpty()) { - strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath)); - } - - final DOMDataTreeListenerWithSubshards subshardListener = - new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener); - final AbstractDOMDataTreeChangeListenerRegistration reg = - setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener); - - for (final ChildShardContext maybeAffected : childShards.values()) { - if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) { - // consumer has initialDataChangeEvent subshard somewhere on lower level - // register to the notification manager with snapshot and forward child notifications to parent - LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath); - subshardListener.addSubshard(maybeAffected); - } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) { - // bind path is inside subshard - // TODO can this happen? seems like in ShardedDOMDataTree we are - // already registering to the lowest shard possible - throw new UnsupportedOperationException("Listener should be registered directly " - + "into initialDataChangeEvent subshard"); - } - } - - return reg; - } - - private AbstractDOMDataTreeChangeListenerRegistration - setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup, - final YangInstanceIdentifier listenerPath, - final DOMDataTreeListenerWithSubshards listener) { - - LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath); - - // register in the shard tree - final RegistrationTreeNode> node = - findNodeFor(listenerPath.getPathArguments()); - - // register listener in CDS - ListenerRegistration listenerReg = distributedDataStore - .registerProxyListener(shardLookup, listenerPath, listener); - - @SuppressWarnings("unchecked") - final AbstractDOMDataTreeChangeListenerRegistration registration = - new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) { - @Override - protected void removeRegistration() { - listener.close(); - DistributedShardChangePublisher.this.removeRegistration(node, this); - registrationRemoved(this); - listenerReg.close(); - } - }; - addRegistration(node, registration); - - return registration; - } - - private static Iterable stripShardPath(final YangInstanceIdentifier shardPath, - final YangInstanceIdentifier listenerPath) { - if (shardPath.isEmpty()) { - return listenerPath.getPathArguments(); - } - - final List listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments()); - final Iterator shardIter = shardPath.getPathArguments().iterator(); - final Iterator listenerIter = listenerPathArgs.iterator(); - - while (shardIter.hasNext()) { - if (shardIter.next().equals(listenerIter.next())) { - listenerIter.remove(); - } else { - break; - } - } - - return listenerPathArgs; - } - - synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, - final Collection changes) throws DataValidationFailedException { - final DataTreeModification modification = dataTree.takeSnapshot().newModification(); - for (final DataTreeCandidate change : changes) { - try { - DataTreeCandidates.applyToModification(modification, change); - } catch (SchemaValidationFailedException e) { - LOG.error("Validation failed", e); - } - } - - modification.ready(); - - final DataTreeCandidate candidate; - - dataTree.validate(modification); - - // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree - candidate = dataTree.prepare(modification); - dataTree.commit(candidate); - - - DataTreeCandidateNode modifiedChild = candidate.getRootNode(); - - for (final PathArgument pathArgument : listenerPath.getPathArguments()) { - modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null); - } - - - if (modifiedChild == null) { - modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument()); - } - - return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild); - } - - - private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener { - - private final YangInstanceIdentifier listenerPath; - private final DOMDataTreeChangeListener delegate; - private final Map> registrations = - new ConcurrentHashMap<>(); - - @GuardedBy("this") - private final Collection stashedDataTreeCandidates = new LinkedList<>(); - - DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, - final DOMDataTreeChangeListener delegate) { - this.listenerPath = requireNonNull(listenerPath); - this.delegate = requireNonNull(delegate); - } - - @Override - public synchronized void onDataTreeChanged(final Collection changes) { - LOG.debug("Received data changed {}", changes); - - if (!stashedDataTreeCandidates.isEmpty()) { - LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates); - changes.addAll(stashedDataTreeCandidates); - stashedDataTreeCandidates.clear(); - } - - try { - applyChanges(listenerPath, changes); - } catch (final DataValidationFailedException e) { - // TODO should we fail here? What if stashed changes - // (changes from subshards) got ahead more than one generation - // from current shard. Than we can fail to apply this changes - // upon current data tree, but once we get respective changes - // from current shard, we can apply also changes from - // subshards. - // - // However, we can loose ability to notice and report some - // errors then. For example, we cannot detect potential lost - // changes from current shard. - LOG.error("Validation failed for modification built from changes {}, current data tree: {}", - changes, dataTree, e); - throw new RuntimeException("Notification validation failed", e); - } - - delegate.onDataTreeChanged(changes); - } - - synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot, - final Collection changes) { - final YangInstanceIdentifier changeId = - YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot)); - - final List newCandidates = changes.stream() - .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) - .collect(Collectors.toList()); - - try { - delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); - } catch (final DataValidationFailedException e) { - // We cannot apply changes from subshard to current data tree. - // Maybe changes from current shard haven't been applied to - // data tree yet. Postpone processing of these changes till we - // receive changes from current shard. - LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.", - pathFromRoot, changes, dataTree, e); - stashedDataTreeCandidates.addAll(newCandidates); - } - } - - void addSubshard(final ChildShardContext context) { - checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, - "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable"); - - final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard(); - // since this is going into subshard we want to listen for ALL changes in the subshard - registrations.put(context.getPrefix().getRootIdentifier(), - listenableShard.registerTreeChangeListener( - context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged( - context.getPrefix().getRootIdentifier(), changes))); - } - - void close() { - for (final ListenerRegistration registration : registrations.values()) { - registration.close(); - } - registrations.clear(); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java deleted file mode 100644 index c3c3a25a2b..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.annotations.Beta; -import java.util.Collection; -import java.util.concurrent.CompletionStage; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; - -/** - * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles - * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend - * shard/replicas that come along with it. - */ -@Beta -@Deprecated(forRemoval = true) -public interface DistributedShardFactory { - /** - * Register a new shard that is rooted at the desired prefix with replicas on the provided members. - * Note to register a shard without replicas you still need to provide at least one Member for the shard. - * - * @param prefix Shard root - * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard - * should not be replicated. - * @return A future that will be completed with a DistributedShardRegistration once the backend and frontend shards - * are spawned. - * @throws DOMDataTreeShardingConflictException If the initial check for a conflict on the local node fails, the - * sharding configuration won't be updated if this exception is thrown. - */ - CompletionStage - createDistributedShard(DOMDataTreeIdentifier prefix, Collection replicaMembers) - throws DOMDataTreeShardingConflictException; -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java deleted file mode 100644 index 81322cba73..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.checkerframework.checker.lock.qual.GuardedBy; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext; -import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer; -import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; -import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard; -import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification; -import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Proxy implementation of a shard that creates forwarding producers to the backend shard. - */ -@Deprecated(forRemoval = true) -class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardFrontend.class); - - private final DataStoreClient client; - private final DOMDataTreeIdentifier shardRoot; - @GuardedBy("this") - private final Map childShards = new HashMap<>(); - @GuardedBy("this") - private final List producers = new ArrayList<>(); - - private final DistributedShardChangePublisher publisher; - - DistributedShardFrontend(final DistributedDataStoreInterface distributedDataStore, - final DataStoreClient client, - final DOMDataTreeIdentifier shardRoot) { - this.client = requireNonNull(client); - this.shardRoot = requireNonNull(shardRoot); - - publisher = new DistributedShardChangePublisher(client, requireNonNull(distributedDataStore), shardRoot, - childShards); - } - - @Override - public synchronized DOMDataTreeShardProducer createProducer(final Collection paths) { - for (final DOMDataTreeIdentifier prodPrefix : paths) { - checkArgument(shardRoot.contains(prodPrefix), "Prefix %s is not contained under shard root", prodPrefix, - paths); - } - - final ShardProxyProducer ret = - new ShardProxyProducer(shardRoot, paths, client, createModificationFactory(paths)); - producers.add(ret); - return ret; - } - - @Override - public synchronized void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { - LOG.debug("{} : Child shard attached at {}", shardRoot, prefix); - checkArgument(child != this, "Attempted to attach child %s onto self", this); - addChildShard(prefix, child); - updateProducers(); - } - - @Override - public synchronized void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { - LOG.debug("{} : Child shard detached at {}", shardRoot, prefix); - childShards.remove(prefix); - updateProducers(); - // TODO we should grab the dataTreeSnapshot that's in the shard and apply it to this shard - } - - private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { - checkArgument(child instanceof WriteableDOMDataTreeShard); - childShards.put(prefix, new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child)); - } - - DistributedShardModificationFactory createModificationFactory(final Collection prefixes) { - // TODO this could be abstract - final Map affectedSubshards = new HashMap<>(); - - for (final DOMDataTreeIdentifier producerPrefix : prefixes) { - for (final ChildShardContext maybeAffected : childShards.values()) { - final DOMDataTreeIdentifier bindPath; - if (producerPrefix.contains(maybeAffected.getPrefix())) { - bindPath = maybeAffected.getPrefix(); - } else if (maybeAffected.getPrefix().contains(producerPrefix)) { - // Bound path is inside subshard - bindPath = producerPrefix; - } else { - continue; - } - - SubshardProducerSpecification spec = affectedSubshards.computeIfAbsent(maybeAffected.getPrefix(), - k -> new SubshardProducerSpecification(maybeAffected)); - spec.addPrefix(bindPath); - } - } - - final DistributedShardModificationFactoryBuilder builder = - new DistributedShardModificationFactoryBuilder(shardRoot); - for (final SubshardProducerSpecification spec : affectedSubshards.values()) { - final ForeignShardModificationContext foreignContext = - new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer()); - builder.addSubshard(foreignContext); - builder.addSubshard(spec.getPrefix(), foreignContext); - } - - return builder.build(); - } - - private void updateProducers() { - for (final ShardProxyProducer producer : producers) { - producer.setModificationFactory(createModificationFactory(producer.getPrefixes())); - } - } - - @Override - public ListenerRegistration registerTreeChangeListener( - final YangInstanceIdentifier treeId, final L listener) { - return publisher.registerTreeChangeListener(treeId, listener); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java deleted file mode 100644 index 25fab74716..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static java.util.Objects.requireNonNull; - -import java.util.Map; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; -import org.opendaylight.mdsal.dom.spi.shard.WritableNodeOperation; -import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy; -import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode; -import org.opendaylight.mdsal.dom.spi.shard.WriteableNodeWithSubshard; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; - -/** - * Shard modification that consists of the whole shard context, provides cursors which correctly delegate to subshards - * if any are present. - */ -@Deprecated(forRemoval = true) -public class DistributedShardModification extends WriteableNodeWithSubshard { - - private final DistributedShardModificationContext context; - private final Map childShards; - - public DistributedShardModification(final DistributedShardModificationContext context, - final Map subshards, - final Map childShards) { - super(subshards); - this.context = requireNonNull(context); - this.childShards = requireNonNull(childShards); - } - - @Override - public PathArgument getIdentifier() { - return context.getIdentifier().getRootIdentifier().getLastPathArgument(); - } - - @Override - public WriteCursorStrategy createOperation(final DOMDataTreeWriteCursor parentCursor) { - return new WritableNodeOperation(this, context.cursor()) { - @Override - public void exit() { - throw new IllegalStateException("Can not exit data tree root"); - } - }; - } - - void cursorClosed() { - context.closeCursor(); - } - - DOMStoreThreePhaseCommitCohort seal() { - childShards.values().stream().filter(ForeignShardModificationContext::isModified) - .forEach(ForeignShardModificationContext::ready); - - return context.ready(); - } - - DOMDataTreeIdentifier getPrefix() { - return context.getIdentifier(); - } - - Map getChildShards() { - return childShards; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java deleted file mode 100644 index 5b3c5313a9..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding; - -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; - -/** - * The context for a single shards modification, keeps a ClientTransaction so it can route requests correctly. - */ -@Deprecated(forRemoval = true) -public class DistributedShardModificationContext { - - private final ClientTransaction transaction; - private final DOMDataTreeIdentifier identifier; - private DOMDataTreeWriteCursor cursor; - - public DistributedShardModificationContext(final ClientTransaction transaction, - final DOMDataTreeIdentifier identifier) { - this.transaction = transaction; - this.identifier = identifier; - } - - public DOMDataTreeIdentifier getIdentifier() { - return identifier; - } - - DOMDataTreeWriteCursor cursor() { - if (cursor == null) { - cursor = transaction.openCursor(); - } - - return cursor; - } - - DOMStoreThreePhaseCommitCohort ready() { - if (cursor != null) { - cursor.close(); - cursor = null; - } - - return transaction.ready(); - } - - void closeCursor() { - if (cursor != null) { - cursor.close(); - cursor = null; - } - } - -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java deleted file mode 100644 index ed771071c7..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import org.opendaylight.mdsal.dom.spi.shard.AbstractDataModificationCursor; -import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy; - -/** - * Internal cursor implementation consisting of WriteCursorStrategies which forwards writes to foreign modifications - * if any. - */ -@Deprecated(forRemoval = true) -public class DistributedShardModificationCursor extends AbstractDataModificationCursor { - - private final ShardProxyTransaction parent; - - public DistributedShardModificationCursor(final DistributedShardModification root, - final ShardProxyTransaction parent) { - super(root); - this.parent = parent; - } - - @Override - protected WriteCursorStrategy getRootOperation(final DistributedShardModification root) { - return root.createOperation(null); - } - - @Override - public void close() { - parent.cursorClosed(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java deleted file mode 100644 index f7bec4093b..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static java.util.Objects.requireNonNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import java.util.Map; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; -import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; - -/** - * Factory for {@link DistributedShardModification}. - */ -@Deprecated(forRemoval = true) -public final class DistributedShardModificationFactory { - private final Map childShards; - private final Map children; - private final DOMDataTreeIdentifier root; - - DistributedShardModificationFactory(final DOMDataTreeIdentifier root, - final Map children, - final Map childShards) { - this.root = requireNonNull(root); - this.children = ImmutableMap.copyOf(children); - this.childShards = ImmutableMap.copyOf(childShards); - } - - @VisibleForTesting - Map getChildren() { - return children; - } - - @VisibleForTesting - Map getChildShards() { - return childShards; - } - - DistributedShardModification createModification(final ClientTransaction transaction) { - return new DistributedShardModification( - new DistributedShardModificationContext(transaction, root), children, childShards); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java deleted file mode 100644 index 93e4d963a8..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.shard.AbstractShardModificationFactoryBuilder; - -/** - * Builder for {@link DistributedShardModificationFactory}. - */ -@Deprecated(forRemoval = true) -public class DistributedShardModificationFactoryBuilder - extends AbstractShardModificationFactoryBuilder { - - - public DistributedShardModificationFactoryBuilder(final DOMDataTreeIdentifier root) { - super(root); - } - - @Override - public DistributedShardModificationFactory build() { - return new DistributedShardModificationFactory(root, buildChildren(), childShards); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardRegistration.java deleted file mode 100644 index cd4407e662..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardRegistration.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.annotations.Beta; -import java.util.concurrent.CompletionStage; - -/** - * Registration of the CDS shard that allows you to remove the shard from the system by closing the registration. - * This removal is done asynchronously. - */ -@Beta -@Deprecated(forRemoval = true) -public interface DistributedShardRegistration { - - /** - * Removes the shard from the system, this removal is done asynchronously, the future completes once the - * backend shard is no longer present. - */ - CompletionStage close(); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java deleted file mode 100644 index bb51203e4a..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ /dev/null @@ -1,696 +0,0 @@ -/* - * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static akka.actor.ActorRef.noSender; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; -import akka.util.Timeout; -import com.google.common.base.Throwables; -import com.google.common.collect.ClassToInstanceMap; -import com.google.common.collect.ForwardingObject; -import com.google.common.collect.ImmutableClassToInstanceMap; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.Uninterruptibles; -import java.util.AbstractMap.SimpleEntry; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.checkerframework.checker.lock.qual.GuardedBy; -import org.opendaylight.controller.cluster.ActorSystemProvider; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.config.Configuration; -import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; -import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; -import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; -import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; -import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; -import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; -import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; -import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.compat.java8.FutureConverters; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - -/** - * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via - * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system. - */ -@Deprecated(forRemoval = true) -public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService, - DistributedShardFactory { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class); - - private static final int MAX_ACTOR_CREATION_RETRIES = 100; - private static final int ACTOR_RETRY_DELAY = 100; - private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS; - private static final int LOOKUP_TASK_MAX_RETRIES = 100; - static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = - new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS); - static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION); - - static final String ACTOR_ID = "ShardedDOMDataTreeFrontend"; - - private final ShardedDOMDataTree shardedDOMDataTree; - private final ActorSystem actorSystem; - private final DistributedDataStoreInterface distributedOperDatastore; - private final DistributedDataStoreInterface distributedConfigDatastore; - - private final ActorRef shardedDataTreeActor; - private final MemberName memberName; - - @GuardedBy("shards") - private final DOMDataTreePrefixTable> shards = - DOMDataTreePrefixTable.create(); - - private final EnumMap> configurationShardMap = - new EnumMap<>(LogicalDatastoreType.class); - - private final EnumMap writerMap = - new EnumMap<>(LogicalDatastoreType.class); - - private final PrefixedShardConfigUpdateHandler updateHandler; - - public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider, - final DistributedDataStoreInterface distributedOperDatastore, - final DistributedDataStoreInterface distributedConfigDatastore) { - this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem(); - this.distributedOperDatastore = requireNonNull(distributedOperDatastore); - this.distributedConfigDatastore = requireNonNull(distributedConfigDatastore); - shardedDOMDataTree = new ShardedDOMDataTree(); - - shardedDataTreeActor = createShardedDataTreeActor(actorSystem, - new ShardedDataTreeActorCreator() - .setShardingService(this) - .setActorSystem(actorSystem) - .setClusterWrapper(distributedConfigDatastore.getActorUtils().getClusterWrapper()) - .setDistributedConfigDatastore(distributedConfigDatastore) - .setDistributedOperDatastore(distributedOperDatastore) - .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES), - ACTOR_ID); - - this.memberName = distributedConfigDatastore.getActorUtils().getCurrentMemberName(); - - updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor, - distributedConfigDatastore.getActorUtils().getCurrentMemberName()); - - LOG.debug("{} - Starting prefix configuration shards", memberName); - createPrefixConfigShard(distributedConfigDatastore); - createPrefixConfigShard(distributedOperDatastore); - } - - private static void createPrefixConfigShard(final DistributedDataStoreInterface dataStore) { - Configuration configuration = dataStore.getActorUtils().getConfiguration(); - Collection memberNames = configuration.getUniqueMemberNamesForAllShards(); - CreateShard createShardMessage = - new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(), - "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME, - memberNames), - Shard.builder(), dataStore.getActorUtils().getDatastoreContext()); - - dataStore.getActorUtils().getShardManager().tell(createShardMessage, noSender()); - } - - /** - * This will try to initialize prefix configuration shards upon their - * successful start. We need to create writers to these shards, so we can - * satisfy future {@link #createDistributedShard} and - * {@link #resolveShardAdditions} requests and update prefix configuration - * shards accordingly. - * - *

- * We also need to initialize listeners on these shards, so we can react - * on changes made on them by other cluster members or even by ourselves. - * - *

- * Finally, we need to be sure that default shards for both operational and - * configuration data stores are up and running and we have distributed - * shards frontend created for them. - * - *

- * This is intended to be invoked by blueprint as initialization method. - */ - public void init() { - // create our writers to the configuration - try { - LOG.debug("{} - starting config shard lookup.", memberName); - - // We have to wait for prefix config shards to be up and running - // so we can create datastore clients for them - handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit()); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IllegalStateException("Prefix config shards not found", e); - } - - try { - LOG.debug("{}: Prefix configuration shards ready - creating clients", memberName); - configurationShardMap.put(LogicalDatastoreType.CONFIGURATION, - createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID, - distributedConfigDatastore.getActorUtils())); - } catch (final DOMDataTreeShardCreationFailedException e) { - throw new IllegalStateException( - "Unable to create datastoreClient for config DS prefix configuration shard.", e); - } - - try { - configurationShardMap.put(LogicalDatastoreType.OPERATIONAL, - createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID, - distributedOperDatastore.getActorUtils())); - - } catch (final DOMDataTreeShardCreationFailedException e) { - throw new IllegalStateException( - "Unable to create datastoreClient for oper DS prefix configuration shard.", e); - } - - writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter( - configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey())); - - writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter( - configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey())); - - updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION); - updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL); - - distributedConfigDatastore.getActorUtils().getShardManager().tell(InitConfigListener.INSTANCE, noSender()); - distributedOperDatastore.getActorUtils().getShardManager().tell(InitConfigListener.INSTANCE, noSender()); - - - //create shard registration for DEFAULT_SHARD - initDefaultShard(LogicalDatastoreType.CONFIGURATION); - initDefaultShard(LogicalDatastoreType.OPERATIONAL); - } - - private ListenableFuture> handleConfigShardLookup() { - - final ListenableFuture configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION); - final ListenableFuture operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL); - - return Futures.allAsList(configFuture, operFuture); - } - - private ListenableFuture lookupConfigShard(final LogicalDatastoreType type) { - final SettableFuture future = SettableFuture.create(); - - final Future ask = - Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT); - - ask.onComplete(new OnComplete<>() { - @Override - public void onComplete(final Throwable throwable, final Object result) { - if (throwable != null) { - future.setException(throwable); - } else { - future.set(null); - } - } - }, actorSystem.dispatcher()); - - return future; - } - - @Override - public ListenerRegistration registerListener( - final T listener, final Collection subtrees, - final boolean allowRxMerges, final Collection producers) - throws DOMDataTreeLoopException { - return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers); - } - - @Override - public ClassToInstanceMap getExtensions() { - return ImmutableClassToInstanceMap.of(); - } - - @Override - public DOMDataTreeProducer createProducer(final Collection subtrees) { - LOG.debug("{} - Creating producer for {}", memberName, subtrees); - final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees); - - final Object response = distributedConfigDatastore.getActorUtils() - .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees)); - if (response == null) { - LOG.debug("{} - Received success from remote nodes, creating producer:{}", memberName, subtrees); - return new ProxyProducer(producer, subtrees, shardedDataTreeActor, - distributedConfigDatastore.getActorUtils(), shards); - } - - closeProducer(producer); - - if (response instanceof Throwable) { - Throwables.throwIfUnchecked((Throwable) response); - throw new RuntimeException((Throwable) response); - } - throw new RuntimeException("Unexpected response to create producer received." + response); - } - - @Override - public CompletionStage createDistributedShard( - final DOMDataTreeIdentifier prefix, final Collection replicaMembers) - throws DOMDataTreeShardingConflictException { - - synchronized (shards) { - final DOMDataTreePrefixTableEntry> lookup = - shards.lookup(prefix); - if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) { - throw new DOMDataTreeShardingConflictException( - "Prefix " + prefix + " is already occupied by another shard."); - } - } - - final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType()); - - final ListenableFuture writeFuture = - writer.writeConfig(prefix.getRootIdentifier(), replicaMembers); - - final Promise shardRegistrationPromise = akka.dispatch.Futures.promise(); - Futures.addCallback(writeFuture, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - - final Future ask = - Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT); - - shardRegistrationPromise.completeWith(ask.transform( - new Mapper() { - @Override - public DistributedShardRegistration apply(final Object parameter) { - return new DistributedShardRegistrationImpl( - prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this); - } - }, - new Mapper() { - @Override - public Throwable apply(final Throwable throwable) { - return new DOMDataTreeShardCreationFailedException( - "Unable to create a cds shard.", throwable); - } - }, actorSystem.dispatcher())); - } - - @Override - public void onFailure(final Throwable throwable) { - shardRegistrationPromise.failure( - new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable)); - } - }, MoreExecutors.directExecutor()); - - return FutureConverters.toJava(shardRegistrationPromise.future()); - } - - void resolveShardAdditions(final Set additions) { - LOG.debug("{}: Resolving additions : {}", memberName, additions); - // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that - additions - .stream() - .sorted(Comparator.comparingInt(o -> o.getRootIdentifier().getPathArguments().size())) - .forEachOrdered(this::createShardFrontend); - } - - void resolveShardRemovals(final Set removals) { - LOG.debug("{}: Resolving removals : {}", memberName, removals); - - // do we need to go from bottom to top? - removals.forEach(this::despawnShardFrontend); - } - - private void createShardFrontend(final DOMDataTreeIdentifier prefix) { - LOG.debug("{}: Creating CDS shard for prefix: {}", memberName, prefix); - final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier()); - final DistributedDataStoreInterface distributedDataStore = - prefix.getDatastoreType().equals(LogicalDatastoreType.CONFIGURATION) - ? distributedConfigDatastore : distributedOperDatastore; - - try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) { - final Entry entry = - createDatastoreClient(shardName, distributedDataStore.getActorUtils()); - - final DistributedShardFrontend shard = - new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix); - - final DOMDataTreeShardRegistration reg = - shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); - - synchronized (shards) { - shards.store(prefix, reg); - } - - } catch (final DOMDataTreeShardingConflictException e) { - LOG.error("{}: Prefix {} is already occupied by another shard", - distributedConfigDatastore.getActorUtils().getClusterWrapper().getCurrentMemberName(), prefix, e); - } catch (DOMDataTreeProducerException e) { - LOG.error("Unable to close producer", e); - } catch (DOMDataTreeShardCreationFailedException e) { - LOG.error("Unable to create datastore client for shard {}", prefix, e); - } - } - - private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) { - LOG.debug("{}: Removing CDS shard for prefix: {}", memberName, prefix); - final DOMDataTreePrefixTableEntry> lookup; - synchronized (shards) { - lookup = shards.lookup(prefix); - } - - if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) { - LOG.debug("{}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..", - memberName, prefix); - return; - } - - lookup.getValue().close(); - // need to remove from our local table thats used for tracking - synchronized (shards) { - shards.remove(prefix); - } - - final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType()); - final ListenableFuture future = writer.removeConfig(prefix.getRootIdentifier()); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix); - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.error("Removal of shard {} from configuration failed.", prefix, throwable); - } - }, MoreExecutors.directExecutor()); - } - - DOMDataTreePrefixTableEntry> lookupShardFrontend( - final DOMDataTreeIdentifier prefix) { - synchronized (shards) { - return shards.lookup(prefix); - } - } - - DOMDataTreeProducer localCreateProducer(final Collection prefix) { - return shardedDOMDataTree.createProducer(prefix); - } - - @Override - public ListenerRegistration registerDataTreeShard( - final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer) - throws DOMDataTreeShardingConflictException { - - LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix); - - if (producer instanceof ProxyProducer) { - return shardedDOMDataTree.registerDataTreeShard(prefix, shard, ((ProxyProducer) producer).delegate()); - } - - return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private Entry createDatastoreClient(final String shardName, final ActorUtils actorUtils) - throws DOMDataTreeShardCreationFailedException { - - LOG.debug("{}: Creating distributed datastore client for shard {}", memberName, shardName); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorUtils, shardName); - - final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); - try { - return new SimpleEntry<>(SimpleDataStoreClientActor - .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor); - } catch (final Exception e) { - LOG.error("{}: Failed to get actor for {}", distributedDataStoreClientProps, memberName, e); - clientActor.tell(PoisonPill.getInstance(), noSender()); - throw new DOMDataTreeShardCreationFailedException( - "Unable to create datastore client for shard{" + shardName + "}", e); - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private void initDefaultShard(final LogicalDatastoreType logicalDatastoreType) { - - final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType); - - if (writer.checkDefaultIsPresent()) { - LOG.debug("{}: Default shard for {} is already present in the config. Possibly saved in snapshot.", - memberName, logicalDatastoreType); - } else { - try { - // Currently the default shard configuration is present in the out-of-box modules.conf and is - // expected to be present. So look up the local default shard here and create the frontend. - - // TODO we don't have to do it for config and operational default shard separately. Just one of them - // should be enough - final ActorUtils actorUtils = logicalDatastoreType == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); - - final Optional defaultLocalShardOptional = - actorUtils.findLocalShard(ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty())); - - if (defaultLocalShardOptional.isPresent()) { - LOG.debug("{}: Default shard for {} is already started, creating just frontend", memberName, - logicalDatastoreType); - createShardFrontend(new DOMDataTreeIdentifier(logicalDatastoreType, - YangInstanceIdentifier.empty())); - } - - // The local shard isn't present - we assume that means the local member isn't in the replica list - // and will be dynamically created later via an explicit add-shard-replica request. This is the - // bootstrapping mechanism to add a new node into an existing cluster. The following code to create - // the default shard as a prefix shard is problematic in this scenario so it is commented out. Since - // the default shard is a module-based shard by default, it makes sense to always treat it as such, - // ie bootstrap it in the same manner as the special prefix-configuration and EOS shards. -// final Collection names = distributedConfigDatastore.getActorUtils().getConfiguration() -// .getUniqueMemberNamesForAllShards(); -// Await.result(FutureConverters.toScala(createDistributedShard( -// new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.empty()), names)), -// SHARD_FUTURE_TIMEOUT_DURATION); -// } catch (DOMDataTreeShardingConflictException e) { -// LOG.debug("{}: Default shard for {} already registered, possibly due to other node doing it faster", -// memberName, logicalDatastoreType); - } catch (Exception e) { - LOG.error("{}: Default shard initialization for {} failed", memberName, logicalDatastoreType, e); - throw new RuntimeException(e); - } - } - } - - private static void closeProducer(final DOMDataTreeProducer producer) { - try { - producer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.error("Unable to close producer", e); - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem, - final ShardedDataTreeActorCreator creator, - final String shardDataTreeActorId) { - Exception lastException = null; - - for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) { - try { - return actorSystem.actorOf(creator.props(), shardDataTreeActorId); - } catch (final Exception e) { - lastException = e; - Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT); - LOG.debug("Could not create actor {} because of {} -" - + " waiting for sometime before retrying (retry count = {})", - shardDataTreeActorId, e.getMessage(), i); - } - } - - throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException); - } - - private class DistributedShardRegistrationImpl implements DistributedShardRegistration { - - private final DOMDataTreeIdentifier prefix; - private final ActorRef shardedDataTreeActor; - private final DistributedShardedDOMDataTree distributedShardedDOMDataTree; - - DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix, - final ActorRef shardedDataTreeActor, - final DistributedShardedDOMDataTree distributedShardedDOMDataTree) { - this.prefix = prefix; - this.shardedDataTreeActor = shardedDataTreeActor; - this.distributedShardedDOMDataTree = distributedShardedDOMDataTree; - } - - @Override - public CompletionStage close() { - // first despawn on the local node - distributedShardedDOMDataTree.despawnShardFrontend(prefix); - // update the config so the remote nodes are updated - final Future ask = - Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT); - - final Future closeFuture = ask.transform( - new Mapper() { - @Override - public Void apply(final Object parameter) { - return null; - } - }, - new Mapper() { - @Override - public Throwable apply(final Throwable throwable) { - return throwable; - } - }, actorSystem.dispatcher()); - - return FutureConverters.toJava(closeFuture); - } - } - - // TODO what about producers created by this producer? - // They should also be CDSProducers - private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer { - - private final DOMDataTreeProducer delegate; - private final Collection subtrees; - private final ActorRef shardDataTreeActor; - private final ActorUtils actorUtils; - @GuardedBy("shardAccessMap") - private final Map shardAccessMap = new HashMap<>(); - - // We don't have to guard access to shardTable in ProxyProducer. - // ShardTable's entries relevant to this ProxyProducer shouldn't - // change during producer's lifetime. - private final DOMDataTreePrefixTable> shardTable; - - ProxyProducer(final DOMDataTreeProducer delegate, - final Collection subtrees, - final ActorRef shardDataTreeActor, - final ActorUtils actorUtils, - final DOMDataTreePrefixTable> shardLayout) { - this.delegate = requireNonNull(delegate); - this.subtrees = requireNonNull(subtrees); - this.shardDataTreeActor = requireNonNull(shardDataTreeActor); - this.actorUtils = requireNonNull(actorUtils); - this.shardTable = requireNonNull(shardLayout); - } - - @Override - public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { - return delegate.createTransaction(isolated); - } - - @Override - @SuppressWarnings("checkstyle:hiddenField") - public DOMDataTreeProducer createProducer(final Collection subtrees) { - // TODO we probably don't need to distribute this on the remote nodes since once we have this producer - // open we surely have the rights to all the subtrees. - return delegate.createProducer(subtrees); - } - - @Override - @SuppressWarnings("checkstyle:IllegalCatch") - public void close() throws DOMDataTreeProducerException { - delegate.close(); - - synchronized (shardAccessMap) { - shardAccessMap.values().forEach(CDSShardAccessImpl::close); - } - - final Object o = actorUtils.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees)); - if (o instanceof DOMDataTreeProducerException) { - throw (DOMDataTreeProducerException) o; - } else if (o instanceof Throwable) { - throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o); - } - } - - @Override - protected DOMDataTreeProducer delegate() { - return delegate; - } - - @Override - public CDSShardAccess getShardAccess(final DOMDataTreeIdentifier subtree) { - checkArgument(subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)), - "Subtree %s is not controlled by this producer %s", subtree, this); - - final DOMDataTreePrefixTableEntry> lookup = - shardTable.lookup(subtree); - checkState(lookup != null, "Subtree %s is not contained in any registered shard.", subtree); - - final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix(); - - synchronized (shardAccessMap) { - if (shardAccessMap.get(lookupId) != null) { - return shardAccessMap.get(lookupId); - } - - // TODO Maybe we can have static factory method and return the same instance - // for same subtrees. But maybe it is not needed since there can be only one - // producer attached to some subtree at a time. And also how we can close ShardAccess - // then - final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorUtils); - shardAccessMap.put(lookupId, shardAccess); - return shardAccess; - } - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java deleted file mode 100644 index 244833da00..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static akka.actor.ActorRef.noSender; - -import akka.actor.ActorRef; -import akka.actor.Status; -import org.eclipse.jdt.annotation.Nullable; - -/** - * Base class for lookup tasks. Lookup tasks are supposed to run repeatedly until successful lookup or maximum retries - * are hit. This class is NOT thread-safe. - */ -@Deprecated(forRemoval = true) -abstract class LookupTask implements Runnable { - private final int maxRetries; - private final ActorRef replyTo; - private int retried = 0; - - LookupTask(final ActorRef replyTo, final int maxRetries) { - this.replyTo = replyTo; - this.maxRetries = maxRetries; - } - - abstract void reschedule(int retries); - - void tryReschedule(final @Nullable Throwable throwable) { - if (retried <= maxRetries) { - retried++; - reschedule(retried); - } else { - fail(throwable); - } - } - - void fail(final @Nullable Throwable throwable) { - if (throwable == null) { - replyTo.tell(new Status.Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..")), noSender()); - } else { - replyTo.tell(new Status.Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..", throwable)), noSender()); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java deleted file mode 100644 index 534811167e..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.collect.ClassToInstanceMap; -import java.util.Collection; -import java.util.concurrent.CompletionStage; -import org.opendaylight.controller.cluster.ActorSystemProvider; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component(immediate = true, property = "type=default") -@Deprecated(forRemoval = true) -public final class OSGiDistributedShardedDOMDataTree - implements DOMDataTreeService, DOMDataTreeShardingService, DistributedShardFactory { - private static final Logger LOG = LoggerFactory.getLogger(OSGiDistributedShardedDOMDataTree.class); - - @Reference - ActorSystemProvider actorSystemProvider = null; - @Reference(target = "(type=distributed-config)") - DistributedDataStoreInterface configDatastore = null; - @Reference(target = "(type=distributed-operational)") - DistributedDataStoreInterface operDatastore = null; - - private DistributedShardedDOMDataTree delegate; - - @Override - public DOMDataTreeProducer createProducer(final Collection subtrees) { - return delegate.createProducer(subtrees); - } - - @Override - public ClassToInstanceMap getExtensions() { - return delegate.getExtensions(); - } - - @Override - public CompletionStage createDistributedShard(final DOMDataTreeIdentifier prefix, - final Collection replicaMembers) throws DOMDataTreeShardingConflictException { - return delegate.createDistributedShard(prefix, replicaMembers); - } - - @Override - public ListenerRegistration registerDataTreeShard( - final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer) - throws DOMDataTreeShardingConflictException { - return delegate.registerDataTreeShard(prefix, shard, producer); - } - - @Override - public ListenerRegistration registerListener(final T listener, - final Collection subtrees, final boolean allowRxMerges, - final Collection producers) throws DOMDataTreeLoopException { - return delegate.registerListener(listener, subtrees, allowRxMerges, producers); - } - - @Activate - void activate() { - LOG.info("Distributed DOM Data Tree Service starting"); - delegate = new DistributedShardedDOMDataTree(actorSystemProvider, operDatastore, configDatastore); - delegate.init(); - LOG.info("Distributed DOM Data Tree Service started"); - } - - @Deactivate - void deactivate() { - LOG.info("Distributed DOM Data Tree Service stopping"); - // TODO: this needs a shutdown hook, I think - delegate = null; - LOG.info("Distributed DOM Data Tree Service stopped"); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java deleted file mode 100644 index 44b68a9805..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static akka.actor.ActorRef.noSender; -import static java.util.Objects.requireNonNull; -import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_PREFIX_QNAME; -import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICAS_QNAME; -import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICA_QNAME; - -import akka.actor.ActorRef; -import java.util.Collection; -import java.util.EnumMap; -import java.util.List; -import java.util.stream.Collectors; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Listens on changes on prefix-shard-configuration. Resolves the changes and - * notifies handling actor with {@link PrefixShardCreated} and - * {@link PrefixShardRemoved} messages. - */ -@Deprecated(forRemoval = true) -public class PrefixedShardConfigUpdateHandler { - - private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigUpdateHandler.class); - private final ActorRef handlingActor; - private final MemberName memberName; - - private final EnumMap> registrations = - new EnumMap<>(LogicalDatastoreType.class); - - public PrefixedShardConfigUpdateHandler(final ActorRef handlingActor, final MemberName memberName) { - this.handlingActor = requireNonNull(handlingActor); - this.memberName = requireNonNull(memberName); - } - - public void initListener(final DistributedDataStoreInterface dataStore, final LogicalDatastoreType type) { - registrations.put(type, dataStore.registerShardConfigListener( - ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor))); - } - - public void close() { - registrations.values().forEach(ListenerRegistration::close); - registrations.clear(); - } - - public static final class ShardConfigHandler implements ClusteredDOMDataTreeChangeListener { - - private final MemberName memberName; - private final LogicalDatastoreType type; - private final ActorRef handlingActor; - private final String logName; - - public ShardConfigHandler(final MemberName memberName, - final LogicalDatastoreType type, - final ActorRef handlingActor) { - this.memberName = memberName; - this.type = type; - this.handlingActor = handlingActor; - logName = memberName.getName() + "-" + type; - } - - @Override - public void onDataTreeChanged(final Collection changes) { - changes.forEach(this::resolveChange); - } - - private void resolveChange(final DataTreeCandidate candidate) { - switch (candidate.getRootNode().getModificationType()) { - case UNMODIFIED: - break; - case APPEARED: - case DELETE: - case DISAPPEARED: - case SUBTREE_MODIFIED: - case WRITE: - resolveModifiedRoot(candidate.getRootNode()); - break; - default: - break; - } - } - - private void resolveModifiedRoot(final DataTreeCandidateNode rootNode) { - - LOG.debug("{}: New config received {}", logName, rootNode); - LOG.debug("{}: Data after: {}", logName, rootNode.getDataAfter()); - - // were in the shards list, iter children and resolve - for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) { - switch (childNode.getModificationType()) { - case UNMODIFIED: - break; - case SUBTREE_MODIFIED: - case APPEARED: - case WRITE: - resolveWrittenShard(childNode); - break; - case DELETE: - case DISAPPEARED: - resolveDeletedShard(childNode); - break; - default: - break; - } - } - } - - @SuppressWarnings("unchecked") - private void resolveWrittenShard(final DataTreeCandidateNode childNode) { - final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get(); - final LeafNode prefix = - (LeafNode) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get(); - - final YangInstanceIdentifier identifier = prefix.getValue(); - - LOG.debug("{}: Deserialized {} from datastore", logName, identifier); - - final ContainerNode replicas = - (ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get(); - - final LeafSetNode replicaList = - (LeafSetNode) replicas.getChild(new NodeIdentifier(SHARD_REPLICA_QNAME)).get(); - - final List retReplicas = replicaList.getValue().stream() - .map(child -> MemberName.forName(child.getValue())) - .collect(Collectors.toList()); - - LOG.debug("{}: Replicas read from ds {}", logName, retReplicas.toString()); - - final PrefixShardConfiguration newConfig = - new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier), - PrefixShardStrategy.NAME, retReplicas); - - LOG.debug("{}: Resulting config {} - sending PrefixShardCreated to {}", logName, newConfig, handlingActor); - - handlingActor.tell(new PrefixShardCreated(newConfig), noSender()); - } - - private void resolveDeletedShard(final DataTreeCandidateNode childNode) { - - final MapEntryNode entryNode = (MapEntryNode) childNode.getDataBefore().get(); - - final LeafNode prefix = - (LeafNode) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get(); - - final YangInstanceIdentifier deleted = prefix.getValue(); - LOG.debug("{}: Removing shard at {}.", memberName, deleted); - - final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(type, deleted); - final PrefixShardRemoved message = new PrefixShardRemoved(domDataTreeIdentifier); - - handlingActor.tell(message, noSender()); - } - - @Override - public String toString() { - return "ShardConfigHandler [logName=" + logName + ", handlingActor=" + handlingActor + "]"; - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java deleted file mode 100644 index 6d9c8e9213..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.Collection; -import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Writes and removes prefix-based shards' configuration - * to prefix-shard-configuration. This classed is meant to be utilized - * by {@link DistributedShardedDOMDataTree} for updating - * prefix-shard-configuration upon creating and de-spawning prefix-based shards. - */ -@Deprecated(forRemoval = true) -class PrefixedShardConfigWriter { - - private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigWriter.class); - - private final ClientLocalHistory history; - - PrefixedShardConfigWriter(final DataStoreClient client) { - history = client.createLocalHistory(); - writeInitialParent(); - } - - ListenableFuture writeConfig(final YangInstanceIdentifier path, final Collection replicas) { - LOG.debug("Writing config for {}, replicas {}", path, replicas); - - return doSubmit(doWrite(path, replicas)); - } - - ListenableFuture removeConfig(final YangInstanceIdentifier path) { - LOG.debug("Removing config for {}.", path); - - return doSubmit(doDelete(path)); - } - - private void writeInitialParent() { - final ClientTransaction tx = history.createTransaction(); - - final DOMDataTreeWriteCursor cursor = tx.openCursor(); - - final ContainerNode root = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(ClusterUtils.PREFIX_SHARDS_QNAME)) - .withChild(ImmutableMapNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_LIST_QNAME)) - .build()) - .build(); - - cursor.merge(ClusterUtils.PREFIX_SHARDS_PATH.getLastPathArgument(), root); - cursor.close(); - - final DOMStoreThreePhaseCommitCohort cohort = tx.ready(); - - submitBlocking(cohort); - } - - private static void submitBlocking(final DOMStoreThreePhaseCommitCohort cohort) { - try { - doSubmit(cohort).get(); - } catch (final InterruptedException | ExecutionException e) { - LOG.error("Unable to write initial shard config parent.", e); - } - } - - private static ListenableFuture doSubmit(final DOMStoreThreePhaseCommitCohort cohort) { - final AsyncFunction validateFunction = input -> cohort.preCommit(); - final AsyncFunction prepareFunction = input -> cohort.commit(); - - final ListenableFuture prepareFuture = Futures.transformAsync(cohort.canCommit(), validateFunction, - MoreExecutors.directExecutor()); - return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor()); - } - - boolean checkDefaultIsPresent() { - final NodeIdentifierWithPredicates pag = - NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, - YangInstanceIdentifier.empty()); - - final YangInstanceIdentifier defaultId = ClusterUtils.SHARD_LIST_PATH.node(pag); - - final ClientSnapshot snapshot = history.takeSnapshot(); - try { - return snapshot.exists(defaultId).get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Presence check of default shard in configuration failed.", e); - return false; - } finally { - snapshot.abort(); - } - } - - private DOMStoreThreePhaseCommitCohort doWrite(final YangInstanceIdentifier path, - final Collection replicas) { - - final ListNodeBuilder> replicaListBuilder = - ImmutableLeafSetNodeBuilder.create().withNodeIdentifier( - new NodeIdentifier(ClusterUtils.SHARD_REPLICA_QNAME)); - - replicas.forEach(name -> replicaListBuilder.withChild( - ImmutableLeafSetEntryNodeBuilder.create() - .withNodeIdentifier(new NodeWithValue<>(ClusterUtils.SHARD_REPLICA_QNAME, name.getName())) - .withValue(name.getName()) - .build())); - - final MapEntryNode newEntry = ImmutableMapEntryNodeBuilder.create() - .withNodeIdentifier( - NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, - path)) - .withChild(ImmutableLeafNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_PREFIX_QNAME)) - .withValue(path) - .build()) - .withChild(ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_REPLICAS_QNAME)) - .withChild(replicaListBuilder.build()) - .build()) - .build(); - - final ClientTransaction tx = history.createTransaction(); - final DOMDataTreeWriteCursor cursor = tx.openCursor(); - - ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter); - - cursor.write(newEntry.getIdentifier(), newEntry); - cursor.close(); - - return tx.ready(); - } - - private DOMStoreThreePhaseCommitCohort doDelete(final YangInstanceIdentifier path) { - - final ClientTransaction tx = history.createTransaction(); - final DOMDataTreeWriteCursor cursor = tx.openCursor(); - - ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter); - - cursor.delete( - NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, path)); - cursor.close(); - - return tx.ready(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java deleted file mode 100644 index f8e19180dd..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static java.util.Objects.requireNonNull; - -import akka.actor.ActorRef; -import akka.actor.Props; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.cluster.dom.api.LeaderLocation; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; -import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; -import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; - -/** - * Proxy actor which acts as a facade for user-provided - * {@link LeaderLocationListener}. It subscribes for {@link LeaderStateChanged} - * notifications in its pre start hook and translates them to - * {@link LeaderLocationListener#onLeaderLocationChanged(LeaderLocation)} - * events. - */ -@Deprecated(forRemoval = true) -public final class RoleChangeListenerActor extends AbstractUntypedActor { - private final LeaderLocationListener leaderLocationListener; - private final ActorRef roleChangeNotifier; - - private RoleChangeListenerActor(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) { - this.roleChangeNotifier = requireNonNull(roleChangeNotifier); - this.leaderLocationListener = requireNonNull(listener); - } - - @Override - public void preStart() throws Exception { - super.preStart(); - roleChangeNotifier.tell(new RegisterRoleChangeListener(), getSelf()); - } - - @Override - protected void handleReceive(final Object message) { - if (message instanceof RoleChangeNotification) { - ignoreMessage(message); - } else if (message instanceof LeaderStateChanged) { - onLeaderStateChanged((LeaderStateChanged) message); - } else { - unknownMessage(message); - } - } - - private void onLeaderStateChanged(final LeaderStateChanged message) { - final LeaderLocation newLocation; - if (message.getLeaderId() == null) { - newLocation = LeaderLocation.UNKNOWN; - } else if (message.getMemberId().equals(message.getLeaderId())) { - newLocation = LeaderLocation.LOCAL; - } else { - newLocation = LeaderLocation.REMOTE; - } - - // TODO should we wrap this in try catch block? - leaderLocationListener.onLeaderLocationChanged(newLocation); - } - - public static Props props(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) { - return Props.create(RoleChangeListenerActor.class, roleChangeNotifier, listener); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java deleted file mode 100644 index 2990c62580..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static java.util.Objects.requireNonNull; - -import com.google.common.collect.ImmutableList; -import java.util.Collection; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer; -import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; - -/** - * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}. - */ -@Deprecated(forRemoval = true) -class ShardProxyProducer implements DOMDataTreeShardProducer { - private final DOMDataTreeIdentifier shardRoot; - private final Collection prefixes; - private final ClientLocalHistory history; - private DistributedShardModificationFactory modificationFactory; - - ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, - final Collection prefixes, - final DataStoreClient client, - final DistributedShardModificationFactory modificationFactory) { - this.shardRoot = requireNonNull(shardRoot); - this.prefixes = ImmutableList.copyOf(prefixes); - this.modificationFactory = requireNonNull(modificationFactory); - history = requireNonNull(client).createLocalHistory(); - } - - @Override - public Collection getPrefixes() { - return prefixes; - } - - @Override - public DOMDataTreeShardWriteTransaction createTransaction() { - return new ShardProxyTransaction(shardRoot, prefixes, - modificationFactory.createModification(history.createTransaction())); - } - - DistributedShardModificationFactory getModificationFactory() { - return modificationFactory; - } - - void setModificationFactory(final DistributedShardModificationFactory modificationFactory) { - this.modificationFactory = requireNonNull(modificationFactory); - } - - @Override - public void close() { - // FIXME: implement this - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java deleted file mode 100644 index fde000f9fd..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.stream.Collectors; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; -import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; -import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into - * {@link ClientTransaction} calls. - */ -@Deprecated(forRemoval = true) -class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { - - private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class); - - private final DOMDataTreeIdentifier shardRoot; - private final Collection prefixes; - private final DistributedShardModification modification; - private ClientTransaction currentTx; - private final List cohorts = new ArrayList<>(); - - private DOMDataTreeWriteCursor cursor = null; - - ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, - final Collection prefixes, - final DistributedShardModification modification) { - this.shardRoot = requireNonNull(shardRoot); - this.prefixes = requireNonNull(prefixes); - this.modification = requireNonNull(modification); - } - - private DOMDataTreeWriteCursor getCursor() { - if (cursor == null) { - cursor = new DistributedShardModificationCursor(modification, this); - } - return cursor; - } - - @Override - public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) { - checkAvailable(prefix); - final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); - final DOMDataTreeWriteCursor ret = getCursor(); - ret.enter(relativePath.getPathArguments()); - return ret; - } - - void cursorClosed() { - cursor = null; - modification.cursorClosed(); - } - - private void checkAvailable(final DOMDataTreeIdentifier prefix) { - for (final DOMDataTreeIdentifier p : prefixes) { - if (p.contains(prefix)) { - return; - } - } - throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. " - + "Available prefixes: " + prefixes); - } - - private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) { - final Optional relative = - path.relativeTo(modification.getPrefix().getRootIdentifier()); - checkArgument(relative.isPresent()); - return relative.get(); - } - - @Override - public void ready() { - LOG.debug("Readying transaction for shard {}", shardRoot); - - requireNonNull(modification, "Attempting to ready an empty transaction."); - - cohorts.add(modification.seal()); - for (Entry entry - : modification.getChildShards().entrySet()) { - cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); - } - } - - @Override - public void close() { - cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort); - cohorts.clear(); - - if (currentTx != null) { - currentTx.abort(); - currentTx = null; - } - } - - @Override - public ListenableFuture submit() { - LOG.debug("Submitting transaction for shard {}", shardRoot); - - checkTransactionReadied(); - - final AsyncFunction validateFunction = input -> prepare(); - final AsyncFunction prepareFunction = input -> commit(); - - // transform validate into prepare - final ListenableFuture prepareFuture = Futures.transformAsync(validate(), validateFunction, - MoreExecutors.directExecutor()); - // transform prepare into commit and return as submit result - return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor()); - } - - private void checkTransactionReadied() { - checkState(!cohorts.isEmpty(), "Transaction not readied yet"); - } - - @Override - public ListenableFuture validate() { - LOG.debug("Validating transaction for shard {}", shardRoot); - - checkTransactionReadied(); - final List> futures = - cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList()); - final SettableFuture ret = SettableFuture.create(); - - Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { - @Override - public void onSuccess(final List result) { - ret.set(true); - } - - @Override - public void onFailure(final Throwable throwable) { - ret.setException(throwable); - } - }, MoreExecutors.directExecutor()); - - return ret; - } - - @Override - public ListenableFuture prepare() { - LOG.debug("Preparing transaction for shard {}", shardRoot); - - checkTransactionReadied(); - final List> futures = - cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList()); - final SettableFuture ret = SettableFuture.create(); - - Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { - @Override - public void onSuccess(final List result) { - ret.set(null); - } - - @Override - public void onFailure(final Throwable throwable) { - ret.setException(throwable); - } - }, MoreExecutors.directExecutor()); - - return ret; - } - - @Override - public ListenableFuture commit() { - LOG.debug("Committing transaction for shard {}", shardRoot); - - checkTransactionReadied(); - final List> futures = - cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList()); - final SettableFuture ret = SettableFuture.create(); - - Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { - @Override - public void onSuccess(final List result) { - ret.set(null); - } - - @Override - public void onFailure(final Throwable throwable) { - ret.setException(throwable); - } - }, MoreExecutors.directExecutor()); - - return ret; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java deleted file mode 100644 index 52c3d25faa..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ /dev/null @@ -1,829 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static java.util.Objects.requireNonNull; - -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.actor.Status; -import akka.actor.Status.Success; -import akka.cluster.ClusterEvent; -import akka.cluster.ClusterEvent.MemberExited; -import akka.cluster.ClusterEvent.MemberRemoved; -import akka.cluster.ClusterEvent.MemberUp; -import akka.cluster.ClusterEvent.MemberWeaklyUp; -import akka.cluster.ClusterEvent.ReachableMember; -import akka.cluster.ClusterEvent.UnreachableMember; -import akka.cluster.Member; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; -import akka.util.Timeout; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; -import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; -import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; -import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; -import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; -import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; -import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.compat.java8.FutureConverters; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -/** - * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote - * nodes of newly open producers/shards on the local node. - */ -@Deprecated(forRemoval = true) -public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { - - private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); - - private static final String PERSISTENCE_ID = "sharding-service-actor"; - private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); - - static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); - - private final DistributedShardedDOMDataTree shardingService; - private final ActorSystem actorSystem; - private final ClusterWrapper clusterWrapper; - // helper actorContext used only for static calls to executeAsync etc - // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore - private final ActorUtils actorUtils; - private final ShardingServiceAddressResolver resolver; - private final DistributedDataStoreInterface distributedConfigDatastore; - private final DistributedDataStoreInterface distributedOperDatastore; - private final int lookupTaskMaxRetries; - - private final Map idToProducer = new HashMap<>(); - - ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { - LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); - - shardingService = builder.getShardingService(); - actorSystem = builder.getActorSystem(); - clusterWrapper = builder.getClusterWrapper(); - distributedConfigDatastore = builder.getDistributedConfigDatastore(); - distributedOperDatastore = builder.getDistributedOperDatastore(); - lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); - actorUtils = distributedConfigDatastore.getActorUtils(); - resolver = new ShardingServiceAddressResolver( - DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); - - clusterWrapper.subscribeToMemberEvents(self()); - } - - @Override - public void preStart() { - } - - @Override - protected void handleRecover(final Object message) { - LOG.debug("Received a recover message {}", message); - } - - @Override - protected void handleCommand(final Object message) { - LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); - if (message instanceof ClusterEvent.MemberUp) { - memberUp((ClusterEvent.MemberUp) message); - } else if (message instanceof ClusterEvent.MemberWeaklyUp) { - memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); - } else if (message instanceof ClusterEvent.MemberExited) { - memberExited((ClusterEvent.MemberExited) message); - } else if (message instanceof ClusterEvent.MemberRemoved) { - memberRemoved((ClusterEvent.MemberRemoved) message); - } else if (message instanceof ClusterEvent.UnreachableMember) { - memberUnreachable((ClusterEvent.UnreachableMember) message); - } else if (message instanceof ClusterEvent.ReachableMember) { - memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof ProducerCreated) { - onProducerCreated((ProducerCreated) message); - } else if (message instanceof NotifyProducerCreated) { - onNotifyProducerCreated((NotifyProducerCreated) message); - } else if (message instanceof ProducerRemoved) { - onProducerRemoved((ProducerRemoved) message); - } else if (message instanceof NotifyProducerRemoved) { - onNotifyProducerRemoved((NotifyProducerRemoved) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof LookupPrefixShard) { - onLookupPrefixShard((LookupPrefixShard) message); - } else if (message instanceof PrefixShardRemovalLookup) { - onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof StartConfigShardLookup) { - onStartConfigShardLookup((StartConfigShardLookup) message); - } - } - - @Override - public String persistenceId() { - return PERSISTENCE_ID; - } - - private void memberUp(final MemberUp message) { - final MemberName memberName = memberToName(message.member()); - - LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, - message.member().address()); - - resolver.addPeerAddress(memberName, message.member().address()); - } - - private void memberWeaklyUp(final MemberWeaklyUp message) { - final MemberName memberName = memberToName(message.member()); - - LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, - message.member().address()); - - resolver.addPeerAddress(memberName, message.member().address()); - } - - private void memberExited(final MemberExited message) { - final MemberName memberName = memberToName(message.member()); - - LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, - message.member().address()); - - resolver.removePeerAddress(memberName); - } - - private void memberRemoved(final MemberRemoved message) { - final MemberName memberName = memberToName(message.member()); - - LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, - message.member().address()); - - resolver.removePeerAddress(memberName); - } - - private void memberUnreachable(final UnreachableMember message) { - final MemberName memberName = memberToName(message.member()); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); - - resolver.removePeerAddress(memberName); - } - - private void memberReachable(final ReachableMember message) { - final MemberName memberName = memberToName(message.member()); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); - - resolver.addPeerAddress(memberName, message.member().address()); - } - - private void onProducerCreated(final ProducerCreated message) { - LOG.debug("Received ProducerCreated: {}", message); - - // fastpath if we have no peers - if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { - getSender().tell(new Status.Success(null), ActorRef.noSender()); - } - - final ActorRef sender = getSender(); - final Collection subtrees = message.getSubtrees(); - - final List> futures = new ArrayList<>(); - - for (final String address : resolver.getShardingServicePeerActorAddresses()) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add( - FutureConverters.toJava( - actorUtils.executeOperationAsync( - actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT)) - .toCompletableFuture()); - } - - final CompletableFuture combinedFuture = CompletableFuture.allOf( - futures.toArray(new CompletableFuture[futures.size()])); - - combinedFuture - .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender())) - .exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); - } - - private void onNotifyProducerCreated(final NotifyProducerCreated message) { - LOG.debug("Received NotifyProducerCreated: {}", message); - - final Collection subtrees = message.getSubtrees(); - - try { - final ActorProducerRegistration registration = - new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees); - subtrees.forEach(id -> idToProducer.put(id, registration)); - sender().tell(new Status.Success(null), self()); - } catch (final IllegalArgumentException e) { - sender().tell(new Status.Failure(e), getSelf()); - } - } - - private void onProducerRemoved(final ProducerRemoved message) { - LOG.debug("Received ProducerRemoved: {}", message); - - final List> futures = new ArrayList<>(); - - for (final String address : resolver.getShardingServicePeerActorAddresses()) { - final ActorSelection selection = actorSystem.actorSelection(address); - - futures.add(FutureConverters.toJava( - actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) - .toCompletableFuture()); - } - - final CompletableFuture combinedFuture = CompletableFuture.allOf( - futures.toArray(new CompletableFuture[futures.size()])); - - final ActorRef respondTo = getSender(); - - combinedFuture - .thenRun(() -> respondTo.tell(new Status.Success(null), self())) - .exceptionally(e -> { - respondTo.tell(new Status.Failure(null), self()); - return null; - }); - - } - - private void onNotifyProducerRemoved(final NotifyProducerRemoved message) { - LOG.debug("Received NotifyProducerRemoved: {}", message); - - final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); - if (registration == null) { - LOG.warn("The notification contained a path on which no producer is registered, throwing away"); - getSender().tell(new Status.Success(null), ActorRef.noSender()); - return; - } - - try { - registration.close(); - getSender().tell(new Status.Success(null), ActorRef.noSender()); - } catch (final DOMDataTreeProducerException e) { - LOG.error("Unable to close producer", e); - getSender().tell(new Status.Failure(e), ActorRef.noSender()); - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private void onLookupPrefixShard(final LookupPrefixShard message) { - LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - - final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); - - // schedule a notification task for the reply - actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, - new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); - } - - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - - shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); - } - - private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { - LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); - - final ShardRemovalLookupTask removalTask = - new ShardRemovalLookupTask(actorSystem, getSender(), - actorUtils, message.getPrefix(), lookupTaskMaxRetries); - - actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("Received PrefixShardRemoved: {}", message); - - shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); - } - - private void onStartConfigShardLookup(final StartConfigShardLookup message) { - LOG.debug("Received StartConfigShardLookup: {}", message); - - final ActorUtils context = - message.getType().equals(LogicalDatastoreType.CONFIGURATION) - ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); - - // schedule a notification task for the reply - actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, - new ConfigShardLookupTask( - actorSystem, getSender(), context, message, lookupTaskMaxRetries), - actorSystem.dispatcher()); - } - - private static MemberName memberToName(final Member member) { - return MemberName.forName(member.roles().iterator().next()); - } - - private class ActorProducerRegistration { - - private final DOMDataTreeProducer producer; - private final Collection subtrees; - - ActorProducerRegistration(final DOMDataTreeProducer producer, - final Collection subtrees) { - this.producer = producer; - this.subtrees = subtrees; - } - - void close() throws DOMDataTreeProducerException { - producer.close(); - subtrees.forEach(idToProducer::remove); - } - } - - private static class ShardFrontendRegistration extends - AbstractObjectRegistration> { - - private final ActorRef clientActor; - private final ListenerRegistration shardRegistration; - - ShardFrontendRegistration(final ActorRef clientActor, - final ListenerRegistration shardRegistration) { - super(shardRegistration); - this.clientActor = clientActor; - this.shardRegistration = shardRegistration; - } - - @Override - protected void removeRegistration() { - shardRegistration.close(); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - } - - /** - * Handles the lookup step of cds shard creation once the configuration is updated. - */ - private static class ShardCreationLookupTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final ClusterWrapper clusterWrapper; - private final ActorUtils context; - private final DistributedShardedDOMDataTree shardingService; - private final DOMDataTreeIdentifier toLookup; - private final int lookupMaxRetries; - - ShardCreationLookupTask(final ActorSystem system, - final ActorRef replyTo, - final ClusterWrapper clusterWrapper, - final ActorUtils context, - final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.clusterWrapper = clusterWrapper; - this.context = context; - this.shardingService = shardingService; - this.toLookup = toLookup; - this.lookupMaxRetries = lookupMaxRetries; - } - - @Override - public void run() { - final Future localShardFuture = - context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); - - localShardFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable throwable, final ActorRef actorRef) { - if (throwable != null) { - tryReschedule(throwable); - } else { - LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); - - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, - new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, - shardingService, toLookup, lookupMaxRetries), - system.dispatcher()); - } - } - }, system.dispatcher()); - } - - @Override - void reschedule(final int retries) { - LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); - } - } - - /** - * Handles the readiness step by waiting for a leader of the created shard. - */ - private static class ShardLeaderLookupTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final ActorUtils context; - private final ClusterWrapper clusterWrapper; - private final ActorRef shard; - private final DistributedShardedDOMDataTree shardingService; - private final DOMDataTreeIdentifier toLookup; - private final int lookupMaxRetries; - - ShardLeaderLookupTask(final ActorSystem system, - final ActorRef replyTo, - final ActorUtils context, - final ClusterWrapper clusterWrapper, - final ActorRef shard, - final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.context = context; - this.clusterWrapper = clusterWrapper; - this.shard = shard; - this.shardingService = shardingService; - this.toLookup = toLookup; - this.lookupMaxRetries = lookupMaxRetries; - } - - @Override - public void run() { - - final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - - ask.onComplete(new OnComplete<>() { - @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) { - if (throwable != null) { - tryReschedule(throwable); - } else { - final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final Optional leaderActor = findLeader.getLeaderActor(); - if (leaderActor.isPresent()) { - // leader is found, backend seems ready, check if the frontend is ready - LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", - clusterWrapper.getCurrentMemberName(), toLookup); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, - new FrontendLookupTask( - system, replyTo, shardingService, toLookup, lookupMaxRetries), - system.dispatcher()); - } else { - tryReschedule(null); - } - } - } - }, system.dispatcher()); - - } - - @Override - void reschedule(final int retries) { - LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", - clusterWrapper.getCurrentMemberName(), toLookup, retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher()); - } - } - - /** - * After backend is ready this handles the last step - checking if we have a frontend shard for the backend, - * once this completes(which should be ready by the time the backend is created, this is just a sanity check in - * case they race), the future for the cds shard creation is completed and the shard is ready for use. - */ - private static final class FrontendLookupTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final DistributedShardedDOMDataTree shardingService; - private final DOMDataTreeIdentifier toLookup; - - FrontendLookupTask(final ActorSystem system, - final ActorRef replyTo, - final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.shardingService = shardingService; - this.toLookup = toLookup; - } - - @Override - public void run() { - final DOMDataTreePrefixTableEntry> entry = - shardingService.lookupShardFrontend(toLookup); - - if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { - replyTo.tell(new Success(null), ActorRef.noSender()); - } else { - tryReschedule(null); - } - } - - private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry entry, - final DOMDataTreeIdentifier prefix) { - if (entry == null) { - return false; - } - - if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) { - return true; - } - - if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) { - return true; - } - - return false; - } - - @Override - void reschedule(final int retries) { - LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); - } - } - - /** - * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the - * configuration. - */ - private static class ShardRemovalLookupTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final ActorUtils context; - private final DOMDataTreeIdentifier toLookup; - - ShardRemovalLookupTask(final ActorSystem system, - final ActorRef replyTo, - final ActorUtils context, - final DOMDataTreeIdentifier toLookup, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.context = context; - this.toLookup = toLookup; - } - - @Override - public void run() { - final Future localShardFuture = - context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); - - localShardFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable throwable, final ActorRef actorRef) { - if (throwable != null) { - //TODO Shouldn't we check why findLocalShard failed? - LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", - toLookup); - replyTo.tell(new Success(null), ActorRef.noSender()); - } else { - tryReschedule(null); - } - } - }, system.dispatcher()); - } - - @Override - void reschedule(final int retries) { - LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", - toLookup, retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher()); - } - } - - /** - * Task for handling the lookup of the backend for the configuration shard. - */ - private static class ConfigShardLookupTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final ActorUtils context; - - ConfigShardLookupTask(final ActorSystem system, - final ActorRef replyTo, - final ActorUtils context, - final StartConfigShardLookup message, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.context = context; - } - - @Override - void reschedule(final int retries) { - LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); - } - - @Override - public void run() { - final Optional localShard = - context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); - - if (!localShard.isPresent()) { - tryReschedule(null); - } else { - LOG.debug("Local backend for prefix configuration shard lookup successful"); - replyTo.tell(new Status.Success(null), ActorRef.noSender()); - } - } - } - - /** - * Task for handling the readiness state of the config shard. Reports success once the leader is elected. - */ - private static class ConfigShardReadinessTask extends LookupTask { - - private final ActorSystem system; - private final ActorRef replyTo; - private final ActorUtils context; - private final ClusterWrapper clusterWrapper; - private final ActorRef shard; - - ConfigShardReadinessTask(final ActorSystem system, - final ActorRef replyTo, - final ActorUtils context, - final ClusterWrapper clusterWrapper, - final ActorRef shard, - final int lookupMaxRetries) { - super(replyTo, lookupMaxRetries); - this.system = system; - this.replyTo = replyTo; - this.context = context; - this.clusterWrapper = clusterWrapper; - this.shard = shard; - } - - @Override - void reschedule(final int retries) { - LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", - clusterWrapper.getCurrentMemberName(), retries); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); - } - - @Override - public void run() { - final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - - ask.onComplete(new OnComplete<>() { - @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) { - if (throwable != null) { - tryReschedule(throwable); - } else { - final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final Optional leaderActor = findLeader.getLeaderActor(); - if (leaderActor.isPresent()) { - // leader is found, backend seems ready, check if the frontend is ready - LOG.debug("{} - Leader for config shard is ready. Ending lookup.", - clusterWrapper.getCurrentMemberName()); - replyTo.tell(new Status.Success(null), ActorRef.noSender()); - } else { - tryReschedule(null); - } - } - } - }, system.dispatcher()); - } - } - - public static class ShardedDataTreeActorCreator { - - private DistributedShardedDOMDataTree shardingService; - private DistributedDataStoreInterface distributedConfigDatastore; - private DistributedDataStoreInterface distributedOperDatastore; - private ActorSystem actorSystem; - private ClusterWrapper cluster; - private int maxRetries; - - public DistributedShardedDOMDataTree getShardingService() { - return shardingService; - } - - public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) { - this.shardingService = shardingService; - return this; - } - - public ActorSystem getActorSystem() { - return actorSystem; - } - - public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) { - this.actorSystem = actorSystem; - return this; - } - - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { - this.cluster = clusterWrapper; - return this; - } - - public ClusterWrapper getClusterWrapper() { - return cluster; - } - - public DistributedDataStoreInterface getDistributedConfigDatastore() { - return distributedConfigDatastore; - } - - public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStoreInterface distributedConfigDatastore) { - this.distributedConfigDatastore = distributedConfigDatastore; - return this; - } - - public DistributedDataStoreInterface getDistributedOperDatastore() { - return distributedOperDatastore; - } - - public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStoreInterface distributedOperDatastore) { - this.distributedOperDatastore = distributedOperDatastore; - return this; - } - - public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { - this.maxRetries = newMaxRetries; - return this; - } - - public int getLookupTaskMaxRetries() { - return maxRetries; - } - - private void verify() { - requireNonNull(shardingService); - requireNonNull(actorSystem); - requireNonNull(cluster); - requireNonNull(distributedConfigDatastore); - requireNonNull(distributedOperDatastore); - } - - public Props props() { - verify(); - return Props.create(ShardedDataTreeActor.class, this); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java deleted file mode 100644 index 1ad5c389e2..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import akka.actor.Address; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; -import org.opendaylight.controller.cluster.access.concepts.MemberName; - -/** - * Resolver for remote {@link ShardedDataTreeActor}'s. - */ -@Deprecated(forRemoval = true) -public class ShardingServiceAddressResolver { - - private final ConcurrentMap memberNameToAddress = new ConcurrentHashMap<>(); - private final String shardingServiceActorIdentifier; - private final MemberName localMemberName; - - public ShardingServiceAddressResolver(final String shardingServiceActorIdentifier, - final MemberName localMemberName) { - this.shardingServiceActorIdentifier = shardingServiceActorIdentifier; - this.localMemberName = localMemberName; - } - - void addPeerAddress(final MemberName memberName, final Address address) { - memberNameToAddress.put(memberName, address); - } - - void removePeerAddress(final MemberName memberName) { - memberNameToAddress.remove(memberName); - } - - Address getPeerAddress(final MemberName memberName) { - return memberNameToAddress.get(memberName); - } - - StringBuilder getActorPathBuilder(final Address address) { - return new StringBuilder().append(address.toString()).append("/user/").append(shardingServiceActorIdentifier); - } - - Collection getShardingServicePeerActorAddresses() { - final Collection peerAddresses = - memberNameToAddress - .entrySet() - .stream() - .filter(entry -> !localMemberName.equals(entry.getKey())) - .map(entry -> getActorPathBuilder(entry.getValue()).toString()) - .collect(Collectors.toList()); - - return peerAddresses; - } - - public String resolve(final MemberName memberName) { - final Address address = memberNameToAddress.get(requireNonNull(memberName)); - checkNotNull(address, "Requested member[%s] is not present in the resolver", memberName); - return getActorPathBuilder(address).toString(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java deleted file mode 100644 index 9970e0591c..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -/** - * Message sent to the local ShardManager, once the shard configuration shard is ready and the ShardManager should - * start its listener. - */ -@Deprecated(forRemoval = true) -public final class InitConfigListener { - - public static final InitConfigListener INSTANCE = new InitConfigListener(); - - private InitConfigListener() { - - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java deleted file mode 100644 index ed58338bea..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding.messages; - -import static java.util.Objects.requireNonNull; - -import com.google.common.annotations.Beta; -import java.io.Serializable; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Sent to the local {@link ShardedDataTreeActor} when there was a shard created - * on the local node. The local actor should notify the remote actors with {@link PrefixShardCreated} which should - * create the required frontend/backend shards. - */ -@Beta -@Deprecated(forRemoval = true) -public class LookupPrefixShard implements Serializable { - private static final long serialVersionUID = 1L; - - private final DOMDataTreeIdentifier prefix; - - public LookupPrefixShard(final DOMDataTreeIdentifier prefix) { - this.prefix = requireNonNull(prefix); - } - - public DOMDataTreeIdentifier getPrefix() { - return prefix; - } - - - @Override - public String toString() { - return "LookupPrefixShard{" - + "prefix=" - + prefix - + '}'; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java deleted file mode 100644 index 0c256976cf..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import com.google.common.collect.ImmutableList; -import java.io.Serializable; -import java.util.Collection; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Message sent to remote {@link ShardedDataTreeActor}'s when attempting - * to create a producer. The remote node should attempt to create a producer in the local sharding service and reply - * with success/failure based on the attempt result. - */ -@Beta -@Deprecated(forRemoval = true) -public class NotifyProducerCreated implements Serializable { - private static final long serialVersionUID = 1L; - private final Collection subtrees; - - public NotifyProducerCreated(final Collection subtrees) { - this.subtrees = ImmutableList.copyOf(subtrees); - } - - public Collection getSubtrees() { - return subtrees; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java deleted file mode 100644 index b93bbc6eba..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import com.google.common.collect.ImmutableList; -import java.io.Serializable; -import java.util.Collection; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Message sent to remote {@link ShardedDataTreeActor}'s when attempting - * to close a producer. The remote node should attempt to close a producer in the local sharding service and reply - * with success/failure based on the attempt result. If the producer doesn't exist on this node report Success. - */ -@Beta -@Deprecated(forRemoval = true) -public class NotifyProducerRemoved implements Serializable { - private static final long serialVersionUID = 1L; - private final Collection subtrees; - - public NotifyProducerRemoved(final Collection subtrees) { - this.subtrees = ImmutableList.copyOf(subtrees); - } - - public Collection getSubtrees() { - return subtrees; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java deleted file mode 100644 index 7e0a4f685e..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; - -/** - * Message sent to the local {@link ShardedDataTreeActor} when a clustered - * shard was created locally. The backend shards/replicas will be handled by the ShardManager but the - * {@link ShardedDataTreeActor} needs to handle the registration of the - * frontends into the {@link org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService}. The configuration only contains - * the Member nodes that this is still yet to be distributed to. The last node will receive PrefixShardConfiguration - * with only it's member present. - */ -@Beta -@Deprecated(forRemoval = true) -public class PrefixShardCreated { - private final PrefixShardConfiguration configuration; - - public PrefixShardCreated(final PrefixShardConfiguration configuration) { - this.configuration = configuration; - } - - public PrefixShardConfiguration getConfiguration() { - return configuration; - } - - @Override - public String toString() { - return "PrefixShardCreated{" - + "configuration=" + configuration - + '}'; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java deleted file mode 100644 index 87daea2eb7..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding.messages; - -import static java.util.Objects.requireNonNull; - -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Sent to the local {@link ShardedDataTreeActor} to initiate the lookup of the shard, once the shard is removed from - * the system entirely the actor responds with a success. - */ -@Deprecated(forRemoval = true) -public class PrefixShardRemovalLookup { - private final DOMDataTreeIdentifier prefix; - - public PrefixShardRemovalLookup(final DOMDataTreeIdentifier prefix) { - this.prefix = requireNonNull(prefix); - } - - public DOMDataTreeIdentifier getPrefix() { - return prefix; - } - - @Override - public String toString() { - return "PrefixShardRemovalLookup{" + "prefix=" + prefix + '}'; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java deleted file mode 100644 index 5187bae51a..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import java.io.Serializable; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Message sent to remote {@link ShardedDataTreeActor}'s when there is an attempt to remove the shard, - * the ShardedDataTreeActor should remove the shard from the current configuration so that the change is picked up - * in the backend ShardManager. - */ -@Beta -@Deprecated(forRemoval = true) -public class PrefixShardRemoved implements Serializable { - private static final long serialVersionUID = 1L; - - private final DOMDataTreeIdentifier prefix; - - public PrefixShardRemoved(final DOMDataTreeIdentifier prefix) { - this.prefix = prefix; - } - - public DOMDataTreeIdentifier getPrefix() { - return prefix; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java deleted file mode 100644 index afc9476c52..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import java.util.Collection; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Message sent to local {@link ShardedDataTreeActor}'s when there was an - * attempt to create a producer on the local node. - */ -@Beta -@Deprecated(forRemoval = true) -public class ProducerCreated { - private final Collection subtrees; - - public ProducerCreated(final Collection subtrees) { - this.subtrees = subtrees; - } - - public Collection getSubtrees() { - return subtrees; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java deleted file mode 100644 index 848c7b9a46..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -import com.google.common.annotations.Beta; -import java.util.Collection; -import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; - -/** - * Message sent to local {@link ShardedDataTreeActor}'s when there was an - * attempt to close a producer on the local node. - */ -@Beta -@Deprecated(forRemoval = true) -public class ProducerRemoved { - - private final Collection subtrees; - - public ProducerRemoved(final Collection subtrees) { - this.subtrees = subtrees; - } - - public Collection getSubtrees() { - return subtrees; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java deleted file mode 100644 index 7ba9648d15..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding.messages; - -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; - -/** - * Message that should be sent to ShardedDataTreeActor when the lookup of the prefix config shard should begin. - * Replied to with Succes once the shard has a leader. - */ -@Deprecated(forRemoval = true) -public class StartConfigShardLookup { - - private final LogicalDatastoreType type; - - public StartConfigShardLookup(final LogicalDatastoreType type) { - this.type = type; - } - - public LogicalDatastoreType getType() { - return type; - } - - @Override - public String toString() { - return "StartConfigShardLookup{type=" + type + '}'; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCursorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCursorTest.java deleted file mode 100644 index b298762108..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCursorTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import static org.mockito.Mockito.verify; - -import java.util.Arrays; -import java.util.stream.Collectors; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; - -@Deprecated(forRemoval = true) -public class ClientTransactionCursorTest { - - private static final QName NODE_1 = QName.create("ns-1", "node-1"); - private static final QName NODE_2 = QName.create(NODE_1, "node-2"); - private static final QName NODE_3 = QName.create(NODE_1, "node-3"); - - @Mock - private ClientTransaction transaction; - private ClientTransactionCursor cursor; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - cursor = new ClientTransactionCursor(transaction); - } - - @Test - public void testEnterOneNode() { - cursor.enter(YangInstanceIdentifier.NodeIdentifier.create(NODE_1)); - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2)); - final YangInstanceIdentifier expected = createId(NODE_1, NODE_2); - verify(transaction).delete(expected); - } - - @Test - public void testEnterNodeIterables() { - final Iterable collect = toPathArg(NODE_1, NODE_2); - cursor.enter(collect); - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_3)); - final YangInstanceIdentifier expected = createId(NODE_1, NODE_2, NODE_3); - verify(transaction).delete(expected); - } - - @Test - public void testEnterNodeVarargs() { - cursor.enter(YangInstanceIdentifier.NodeIdentifier.create(NODE_1), - YangInstanceIdentifier.NodeIdentifier.create(NODE_2)); - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_3)); - final YangInstanceIdentifier expected = createId(NODE_1, NODE_2, NODE_3); - verify(transaction).delete(expected); - } - - @Test - public void testExitOneLevel() { - cursor.enter(toPathArg(NODE_1, NODE_2)); - cursor.exit(); - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2)); - final YangInstanceIdentifier expected = createId(NODE_1, NODE_2); - verify(transaction).delete(expected); - } - - @Test - public void testExitTwoLevels() { - cursor.enter(toPathArg(NODE_1, NODE_2, NODE_3)); - cursor.exit(2); - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2)); - final YangInstanceIdentifier expected = createId(NODE_1, NODE_2); - verify(transaction).delete(expected); - } - - @Test - public void testClose() { - cursor.close(); - verify(transaction).closeCursor(cursor); - } - - @Test - public void testDelete() { - cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_1)); - final YangInstanceIdentifier expected = createId(NODE_1); - verify(transaction).delete(expected); - } - - @Test - public void testMerge() { - final YangInstanceIdentifier.NodeIdentifier path = YangInstanceIdentifier.NodeIdentifier.create(NODE_1); - final ContainerNode data = createData(path.getNodeType()); - cursor.merge(path, data); - final YangInstanceIdentifier expected = createId(NODE_1); - verify(transaction).merge(expected, data); - } - - @Test - public void testWrite() { - final YangInstanceIdentifier.NodeIdentifier path = YangInstanceIdentifier.NodeIdentifier.create(NODE_1); - final ContainerNode data = createData(path.getNodeType()); - cursor.write(path, data); - final YangInstanceIdentifier expected = createId(NODE_1); - verify(transaction).write(expected, data); - } - - private static Iterable toPathArg(final QName... pathArguments) { - return Arrays.stream(pathArguments) - .map(YangInstanceIdentifier.NodeIdentifier::create) - .collect(Collectors.toList()); - } - - private static YangInstanceIdentifier createId(final QName... pathArguments) { - return YangInstanceIdentifier.create(toPathArg(pathArguments)); - } - - private static ContainerNode createData(final QName id) { - return Builders.containerBuilder() - .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(id)) - .build(); - } - -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java index bb8fbf15e6..486c07c8a8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java @@ -13,7 +13,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals; -import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout; import com.google.common.util.concurrent.FluentFuture; @@ -24,7 +23,6 @@ import org.junit.Test; import org.mockito.Mock; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -62,20 +60,6 @@ public class ClientTransactionTest extends AbstractClientHandleTest exists = getHandle().exists(PATH); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java deleted file mode 100644 index e79222fa51..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import akka.actor.ActorRef; -import akka.dispatch.Futures; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; -import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.dom.api.LeaderLocation; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration; -import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -@Deprecated(forRemoval = true) -public class CDSShardAccessImplTest extends AbstractActorTest { - - private static final DOMDataTreeIdentifier TEST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - - private CDSShardAccessImpl shardAccess; - private ActorUtils context; - - @Before - public void setUp() { - context = mock(ActorUtils.class); - final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); - doReturn(Optional.of(getSystem().deadLetters())).when(context).findLocalShard(any()); - doReturn(datastoreContext).when(context).getDatastoreContext(); - doReturn(getSystem()).when(context).getActorSystem(); - shardAccess = new CDSShardAccessImpl(TEST_ID, context); - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testRegisterLeaderLocationListener() { - final LeaderLocationListener listener1 = mock(LeaderLocationListener.class); - - // first registration should be OK - shardAccess.registerLeaderLocationListener(listener1); - - // second registration should fail with IllegalArgumentEx - try { - shardAccess.registerLeaderLocationListener(listener1); - fail("Should throw exception"); - } catch (final Exception e) { - assertTrue(e instanceof IllegalArgumentException); - } - - // null listener registration should fail with NPE - try { - shardAccess.registerLeaderLocationListener(null); - fail("Should throw exception"); - } catch (final Exception e) { - assertTrue(e instanceof NullPointerException); - } - - // registering listener on closed shard access should fail with IllegalStateEx - final LeaderLocationListener listener2 = mock(LeaderLocationListener.class); - shardAccess.close(); - try { - shardAccess.registerLeaderLocationListener(listener2); - fail("Should throw exception"); - } catch (final Exception ex) { - assertTrue(ex instanceof IllegalStateException); - } - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testOnLeaderLocationChanged() { - final LeaderLocationListener listener1 = mock(LeaderLocationListener.class); - doThrow(new RuntimeException("Failed")).when(listener1).onLeaderLocationChanged(any()); - final LeaderLocationListener listener2 = mock(LeaderLocationListener.class); - doNothing().when(listener2).onLeaderLocationChanged(any()); - final LeaderLocationListener listener3 = mock(LeaderLocationListener.class); - doNothing().when(listener3).onLeaderLocationChanged(any()); - - final LeaderLocationListenerRegistration reg1 = shardAccess.registerLeaderLocationListener(listener1); - final LeaderLocationListenerRegistration reg2 = shardAccess.registerLeaderLocationListener(listener2); - final LeaderLocationListenerRegistration reg3 = shardAccess.registerLeaderLocationListener(listener3); - - // Error in listener1 should not affect dispatching change to other listeners - shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL); - verify(listener1).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); - verify(listener2).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); - verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); - - // Closed listeners shouldn't see new leader location changes - reg1.close(); - reg2.close(); - shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE); - verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.REMOTE)); - verifyNoMoreInteractions(listener1); - verifyNoMoreInteractions(listener2); - - // Closed shard access should not dispatch any new events - shardAccess.close(); - shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN); - verifyNoMoreInteractions(listener1); - verifyNoMoreInteractions(listener2); - verifyNoMoreInteractions(listener3); - - reg3.close(); - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testGetShardIdentifier() { - assertEquals(shardAccess.getShardIdentifier(), TEST_ID); - - // closed shard access should throw illegal state - shardAccess.close(); - try { - shardAccess.getShardIdentifier(); - fail("Exception expected"); - } catch (final Exception e) { - assertTrue(e instanceof IllegalStateException); - } - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testGetLeaderLocation() { - // new shard access does not know anything about leader location - assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN); - - // we start getting leader location changes notifications - shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL); - assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.LOCAL); - - shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE); - shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN); - assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN); - - // closed shard access throws illegal state - shardAccess.close(); - try { - shardAccess.getLeaderLocation(); - fail("Should have failed with IllegalStateEx"); - } catch (Exception e) { - assertTrue(e instanceof IllegalStateException); - } - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testMakeLeaderLocal() throws Exception { - final FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS); - final ActorRef localShardRef = mock(ActorRef.class); - final Future localShardRefFuture = Futures.successful(localShardRef); - doReturn(localShardRefFuture).when(context).findLocalShardAsync(any()); - - // MakeLeaderLocal will reply with success - doReturn(Futures.successful(null)).when(context).executeOperationAsync((ActorRef) any(), any(), any()); - doReturn(getSystem().dispatcher()).when(context).getClientDispatcher(); - assertEquals(waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout), null); - - // MakeLeaderLocal will reply with failure - doReturn(Futures.failed(new LeadershipTransferFailedException("Failure"))) - .when(context).executeOperationAsync((ActorRef) any(), any(), any()); - - try { - waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout); - fail("makeLeaderLocal operation should not be successful"); - } catch (final Exception e) { - assertTrue(e instanceof LeadershipTransferFailedException); - } - - // we don't even find local shard - doReturn(Futures.failed(new LocalShardNotFoundException("Local shard not found"))) - .when(context).findLocalShardAsync(any()); - - try { - waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout); - fail("makeLeaderLocal operation should not be successful"); - } catch (final Exception e) { - assertTrue(e instanceof LeadershipTransferFailedException); - assertTrue(e.getCause() instanceof LocalShardNotFoundException); - } - - // closed shard access should throw IllegalStateEx - shardAccess.close(); - try { - shardAccess.makeLeaderLocal(); - fail("Should have thrown IllegalStateEx. ShardAccess is closed"); - } catch (final Exception e) { - assertTrue(e instanceof IllegalStateException); - } - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java deleted file mode 100644 index aba233fe6f..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import java.util.Collections; -import java.util.List; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; - -@Deprecated(forRemoval = true) -public class DistributedShardFrontendTest { - - private static final DOMDataTreeIdentifier ROOT = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - private static final ListenableFuture SUCCESS_FUTURE = Futures.immediateFuture(null); - - private ShardedDOMDataTree shardedDOMDataTree; - - private DataStoreClient client; - private ClientLocalHistory clientHistory; - private ClientTransaction clientTransaction; - private DOMDataTreeWriteCursor cursor; - - private static final YangInstanceIdentifier OUTER_LIST_YID = TestModel.OUTER_LIST_PATH.node( - NodeIdentifierWithPredicates.of(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - private static final DOMDataTreeIdentifier OUTER_LIST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, OUTER_LIST_YID); - - @Captor - private ArgumentCaptor pathArgumentCaptor; - @Captor - private ArgumentCaptor> nodeCaptor; - - private DOMStoreThreePhaseCommitCohort commitCohort; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - shardedDOMDataTree = new ShardedDOMDataTree(); - client = mock(DataStoreClient.class); - cursor = mock(DOMDataTreeWriteCursor.class); - clientTransaction = mock(ClientTransaction.class); - clientHistory = mock(ClientLocalHistory.class); - commitCohort = mock(DOMStoreThreePhaseCommitCohort.class); - - doReturn(SUCCESS_FUTURE).when(commitCohort).canCommit(); - doReturn(SUCCESS_FUTURE).when(commitCohort).preCommit(); - doReturn(SUCCESS_FUTURE).when(commitCohort).commit(); - doReturn(SUCCESS_FUTURE).when(commitCohort).abort(); - - doReturn(clientTransaction).when(client).createTransaction(); - doReturn(clientTransaction).when(clientHistory).createTransaction(); - doNothing().when(clientHistory).close(); - - doNothing().when(client).close(); - doReturn(clientHistory).when(client).createLocalHistory(); - - doReturn(cursor).when(clientTransaction).openCursor(); - doNothing().when(cursor).close(); - doNothing().when(cursor).write(any(), any()); - doNothing().when(cursor).merge(any(), any()); - doNothing().when(cursor).delete(any()); - - doReturn(commitCohort).when(clientTransaction).ready(); - } - - @Test - public void testClientTransaction() throws Exception { - final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class); - final ActorUtils context = mock(ActorUtils.class); - doReturn(context).when(distributedDataStore).getActorUtils(); - doReturn(SchemaContextHelper.full()).when(context).getSchemaContext(); - - final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT); - - try (DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) { - shardedDOMDataTree.registerDataTreeShard(ROOT, rootShard, producer); - } - - final DataStoreClient outerListClient = mock(DataStoreClient.class); - final ClientTransaction outerListClientTransaction = mock(ClientTransaction.class); - final ClientLocalHistory outerListClientHistory = mock(ClientLocalHistory.class); - final DOMDataTreeWriteCursor outerListCursor = mock(DOMDataTreeWriteCursor.class); - - doNothing().when(outerListCursor).close(); - doNothing().when(outerListCursor).write(any(), any()); - doNothing().when(outerListCursor).merge(any(), any()); - doNothing().when(outerListCursor).delete(any()); - - doReturn(outerListCursor).when(outerListClientTransaction).openCursor(); - doReturn(outerListClientTransaction).when(outerListClient).createTransaction(); - doReturn(outerListClientHistory).when(outerListClient).createLocalHistory(); - doReturn(outerListClientTransaction).when(outerListClientHistory).createTransaction(); - - doReturn(commitCohort).when(outerListClientTransaction).ready(); - - doNothing().when(outerListClientHistory).close(); - doNothing().when(outerListClient).close(); - - final DistributedShardFrontend outerListShard = new DistributedShardFrontend( - distributedDataStore, outerListClient, OUTER_LIST_ID); - try (DOMDataTreeProducer producer = - shardedDOMDataTree.createProducer(Collections.singletonList(OUTER_LIST_ID))) { - shardedDOMDataTree.registerDataTreeShard(OUTER_LIST_ID, outerListShard, producer); - } - - final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT)); - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); - final DOMDataTreeWriteCursor txCursor = tx.createCursor(ROOT); - - assertNotNull(txCursor); - txCursor.write(TestModel.TEST_PATH.getLastPathArgument(), createCrossShardContainer()); - - //check the lower shard got the correct modification - verify(outerListCursor, times(2)).write(pathArgumentCaptor.capture(), nodeCaptor.capture()); - - final List capturedArgs = pathArgumentCaptor.getAllValues(); - assertEquals(2, capturedArgs.size()); - assertThat(capturedArgs, - hasItems(new NodeIdentifier(TestModel.ID_QNAME), new NodeIdentifier(TestModel.INNER_LIST_QNAME))); - - final List> capturedValues = nodeCaptor.getAllValues(); - assertEquals(2, capturedValues.size()); - assertThat(capturedValues, - hasItems(ImmutableNodes.leafNode(TestModel.ID_QNAME, 1), createInnerMapNode(1))); - - txCursor.close(); - tx.commit().get(); - - verify(commitCohort, times(2)).canCommit(); - verify(commitCohort, times(2)).preCommit(); - verify(commitCohort, times(2)).commit(); - } - - private static MapNode createInnerMapNode(final int id) { - final MapEntryNode listEntry = ImmutableNodes - .mapEntryBuilder(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, "name-" + id) - .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "name-" + id)) - .withChild(ImmutableNodes.leafNode(TestModel.VALUE_QNAME, "value-" + id)) - .build(); - - return ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).withChild(listEntry).build(); - } - - private static ContainerNode createCrossShardContainer() { - - final MapEntryNode outerListEntry1 = - ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1) - .withChild(createInnerMapNode(1)) - .build(); - final MapEntryNode outerListEntry2 = - ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2) - .withChild(createInnerMapNode(2)) - .build(); - - final MapNode outerList = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) - .withChild(outerListEntry1) - .withChild(outerListEntry2) - .build(); - - final ContainerNode testContainer = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(outerList) - .build(); - - return testContainer; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java deleted file mode 100644 index 749d69987d..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; -import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.AddressFromURIString; -import akka.cluster.Cluster; -import akka.testkit.javadsl.TestKit; -import com.google.common.collect.Lists; -import com.typesafe.config.ConfigFactory; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.opendaylight.controller.cluster.ActorSystemProvider; -import org.opendaylight.controller.cluster.datastore.AbstractTest; -import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; -import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); - - private static final Address MEMBER_1_ADDRESS = - AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); - - private static final DOMDataTreeIdentifier TEST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - - private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf"; - - private ActorSystem leaderSystem; - private ActorSystem followerSystem; - - - private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); - - private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); - - private DistributedDataStore leaderConfigDatastore; - private DistributedDataStore leaderOperDatastore; - - private DistributedDataStore followerConfigDatastore; - private DistributedDataStore followerOperDatastore; - - - private IntegrationTestKit followerTestKit; - private IntegrationTestKit leaderTestKit; - private DistributedShardedDOMDataTree leaderShardFactory; - - private DistributedShardedDOMDataTree followerShardFactory; - private ActorSystemProvider leaderSystemProvider; - private ActorSystemProvider followerSystemProvider; - - @Before - public void setUp() { - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); - - followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); - - leaderSystemProvider = Mockito.mock(ActorSystemProvider.class); - doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem(); - - followerSystemProvider = Mockito.mock(ActorSystemProvider.class); - doReturn(followerSystem).when(followerSystemProvider).getActorSystem(); - - } - - @After - public void tearDown() { - if (leaderConfigDatastore != null) { - leaderConfigDatastore.close(); - } - if (leaderOperDatastore != null) { - leaderOperDatastore.close(); - } - - if (followerConfigDatastore != null) { - followerConfigDatastore.close(); - } - if (followerOperDatastore != null) { - followerOperDatastore.close(); - } - - TestKit.shutdownActorSystem(leaderSystem, true); - TestKit.shutdownActorSystem(followerSystem, true); - - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - } - - private void initEmptyDatastores() throws Exception { - initEmptyDatastores(MODULE_SHARDS_CONFIG); - } - - private void initEmptyDatastores(final String moduleShardsConfig) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - - leaderConfigDatastore = leaderTestKit.setupDistributedDataStore( - "config", moduleShardsConfig, true, - SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - leaderOperDatastore = leaderTestKit.setupDistributedDataStore( - "operational", moduleShardsConfig, true, - SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - leaderOperDatastore, - leaderConfigDatastore); - - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - - followerConfigDatastore = followerTestKit.setupDistributedDataStore( - "config", moduleShardsConfig, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - followerOperDatastore = followerTestKit.setupDistributedDataStore( - "operational", moduleShardsConfig, true, - SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - - followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider, - followerOperDatastore, - followerConfigDatastore); - - followerTestKit.waitForMembersUp("member-1"); - - LOG.info("Initializing leader DistributedShardedDOMDataTree"); - leaderShardFactory.init(); - - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty())); - - leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty())); - - LOG.info("Initializing follower DistributedShardedDOMDataTree"); - followerShardFactory.init(); - } - - @Test - public void testProducerRegistrations() throws Exception { - LOG.info("testProducerRegistrations starting"); - initEmptyDatastores(); - - leaderTestKit.waitForMembersUp("member-2"); - - // TODO refactor shard creation and verification to own method - final DistributedShardRegistration shardRegistration = - waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - - final ActorRef leaderShardManager = leaderConfigDatastore.getActorUtils().getShardManager(); - - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); - - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); - - final Set peers = new HashSet<>(); - IntegrationTestKit.verifyShardState(leaderConfigDatastore, - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> - peers.addAll(onDemandShardState.getPeerAddresses().values())); - assertEquals(peers.size(), 1); - - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - try { - followerShardFactory.createProducer(Collections.singleton(TEST_ID)); - fail("Producer should be already registered on the other node"); - } catch (final IllegalArgumentException e) { - assertTrue(e.getMessage().contains("is attached to producer")); - } - - producer.close(); - - final DOMDataTreeProducer followerProducer = - followerShardFactory.createProducer(Collections.singleton(TEST_ID)); - try { - leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - fail("Producer should be already registered on the other node"); - } catch (final IllegalArgumentException e) { - assertTrue(e.getMessage().contains("is attached to producer")); - } - - followerProducer.close(); - // try to create a shard on an already registered prefix on follower - try { - waitOnAsyncTask(followerShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - fail("This prefix already should have a shard registration that was forwarded from the other node"); - } catch (final DOMDataTreeShardingConflictException e) { - assertTrue(e.getMessage().contains("is already occupied by another shard")); - } - - shardRegistration.close().toCompletableFuture().get(); - - LOG.info("testProducerRegistrations ending"); - } - - @Test - public void testWriteIntoMultipleShards() throws Exception { - LOG.info("testWriteIntoMultipleShards starting"); - initEmptyDatastores(); - - leaderTestKit.waitForMembersUp("member-2"); - - LOG.debug("registering first shard"); - final DistributedShardRegistration shardRegistration = - waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - - final Set peers = new HashSet<>(); - IntegrationTestKit.verifyShardState(leaderConfigDatastore, - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> - peers.addAll(onDemandShardState.getPeerAddresses().values())); - assertEquals(peers.size(), 1); - - LOG.debug("Got after waiting for nonleader"); - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); - final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID); - Assert.assertNotNull(cursor); - final YangInstanceIdentifier nameId = - YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); - cursor.write(nameId.getLastPathArgument(), - ImmutableLeafNodeBuilder.create().withNodeIdentifier( - new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build()); - - cursor.close(); - LOG.warn("Got to pre submit"); - - tx.commit().get(); - - shardRegistration.close().toCompletableFuture().get(); - - LOG.info("testWriteIntoMultipleShards ending"); - } - - @Test - public void testMultipleShardRegistrations() throws Exception { - LOG.info("testMultipleShardRegistrations starting"); - initEmptyDatastores(); - - final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); - - // check leader has local shards - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); - - // check follower has local shards - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); - - LOG.debug("Closing registrations"); - - reg1.close().toCompletableFuture().get(); - reg2.close().toCompletableFuture().get(); - reg3.close().toCompletableFuture().get(); - reg4.close().toCompletableFuture().get(); - - waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - - waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - - waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); - - LOG.debug("All leader shards gone"); - - waitUntilShardIsDown(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - waitUntilShardIsDown(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - - waitUntilShardIsDown(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - - waitUntilShardIsDown(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); - - LOG.debug("All follower shards gone"); - LOG.info("testMultipleShardRegistrations ending"); - } - - @Test - public void testMultipleRegistrationsAtOnePrefix() throws Exception { - LOG.info("testMultipleRegistrationsAtOnePrefix starting"); - initEmptyDatastores(); - - for (int i = 0; i < 5; i++) { - LOG.info("Round {}", i); - final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - - final Set peers = new HashSet<>(); - IntegrationTestKit.verifyShardState(leaderConfigDatastore, - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> - peers.addAll(onDemandShardState.getPeerAddresses().values())); - assertEquals(peers.size(), 1); - - waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - waitUntilShardIsDown(followerConfigDatastore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - } - - LOG.info("testMultipleRegistrationsAtOnePrefix ending"); - } - - @Test - public void testInitialBootstrappingWithNoModuleShards() throws Exception { - LOG.info("testInitialBootstrappingWithNoModuleShards starting"); - initEmptyDatastores("module-shards-default-member-1.conf"); - - // We just verify the DistributedShardedDOMDataTree initialized without error. - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java deleted file mode 100644 index 602e2cc407..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; -import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.AddressFromURIString; -import akka.actor.Props; -import akka.cluster.Cluster; -import akka.testkit.javadsl.TestKit; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.ListenableFuture; -import com.typesafe.config.ConfigFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.ActorSystemProvider; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; -import org.opendaylight.controller.cluster.datastore.AbstractTest; -import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; -import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; -import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; -import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class DistributedShardedDOMDataTreeTest extends AbstractTest { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); - - private static final Address MEMBER_1_ADDRESS = - AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - - private static final DOMDataTreeIdentifier TEST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - - private static final DOMDataTreeIdentifier INNER_LIST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments()) - .node(TestModel.INNER_LIST_QNAME)); - private static final Set SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME); - - private static final String MODULE_SHARDS_CONFIG = "module-shards-default-member-1.conf"; - - private ActorSystem leaderSystem; - - private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder() - .shardHeartbeatIntervalInMillis(100) - .shardElectionTimeoutFactor(2) - .logicalStoreType(LogicalDatastoreType.CONFIGURATION); - - private DistributedDataStore leaderDistributedDataStore; - private DistributedDataStore operDistributedDatastore; - private IntegrationTestKit leaderTestKit; - - private DistributedShardedDOMDataTree leaderShardFactory; - - @Captor - private ArgumentCaptor> captorForChanges; - @Captor - private ArgumentCaptor>> captorForSubtrees; - - private ActorSystemProvider leaderSystemProvider; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); - - leaderSystemProvider = Mockito.mock(ActorSystemProvider.class); - doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem(); - } - - @After - public void tearDown() { - if (leaderDistributedDataStore != null) { - leaderDistributedDataStore.close(); - } - - if (operDistributedDatastore != null) { - operDistributedDatastore.close(); - } - - TestKit.shutdownActorSystem(leaderSystem); - - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - } - - private void initEmptyDatastores() throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore( - "config", MODULE_SHARDS_CONFIG, "empty-modules.conf", true, - SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - - operDistributedDatastore = leaderTestKit.setupDistributedDataStore( - "operational", MODULE_SHARDS_CONFIG, "empty-modules.conf",true, - SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); - - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - operDistributedDatastore, - leaderDistributedDataStore); - - leaderShardFactory.init(); - } - - - @Test - public void testWritesIntoDefaultShard() throws Exception { - initEmptyDatastores(); - - final DOMDataTreeIdentifier configRoot = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot)); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier( - LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty())); - Assert.assertNotNull(cursor); - - final ContainerNode test = - ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).build(); - - cursor.write(test.getIdentifier(), test); - cursor.close(); - - tx.commit().get(); - } - - @Test - public void testSingleNodeWritesAndRead() throws Exception { - initEmptyDatastores(); - - final DistributedShardRegistration shardRegistration = waitOnAsyncTask( - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); - final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID); - Assert.assertNotNull(cursor); - final YangInstanceIdentifier nameId = - YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); - final LeafNode valueToCheck = ImmutableLeafNodeBuilder.create().withNodeIdentifier( - new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build(); - LOG.debug("Writing data {} at {}, cursor {}", nameId.getLastPathArgument(), valueToCheck, cursor); - cursor.write(nameId.getLastPathArgument(), - valueToCheck); - - cursor.close(); - LOG.debug("Got to pre submit"); - - tx.commit().get(); - - final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); - doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); - - leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID), - true, Collections.emptyList()); - - verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(), - captorForSubtrees.capture()); - final List> capturedValue = captorForChanges.getAllValues(); - - final Optional> dataAfter = - capturedValue.get(0).iterator().next().getRootNode().getDataAfter(); - - final NormalizedNode expected = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).withChild(valueToCheck).build(); - assertEquals(expected, dataAfter.get()); - - verifyNoMoreInteractions(mockedDataTreeListener); - - final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()); - LOG.debug("Creating distributed datastore client for shard {}", shardName); - - final ActorUtils actorUtils = leaderDistributedDataStore.getActorUtils(); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(), "Shard-" + shardName, actorUtils, - shardName); - - final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps); - final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor - .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - - final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory(); - final ClientTransaction tx2 = localHistory.createTransaction(); - final FluentFuture>> read = tx2.read(YangInstanceIdentifier.empty()); - - final Optional> optional = read.get(); - tx2.abort(); - localHistory.close(); - - shardRegistration.close().toCompletableFuture().get(); - - } - - @Test - public void testMultipleWritesIntoSingleMapEntry() throws Exception { - initEmptyDatastores(); - - final DistributedShardRegistration shardRegistration = waitOnAsyncTask( - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - - LOG.warn("Got after waiting for nonleader"); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorUtils().getShardManager(); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - final YangInstanceIdentifier oid1 = getOuterListIdFor(0); - final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1); - - final DistributedShardRegistration outerListShardReg = waitOnAsyncTask( - leaderShardFactory.createDistributedShard(outerListPath, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier())); - - final DOMDataTreeProducer shardProducer = leaderShardFactory.createProducer( - Collections.singletonList(outerListPath)); - - final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1)); - assertNotNull(cursor); - - MapNode innerList = ImmutableMapNodeBuilder - .create() - .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) - .build(); - - cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList); - cursor.close(); - tx.commit().get(); - - final ArrayList> futures = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - final Collection innerListMapEntries = createInnerListMapEntries(1000, "run-" + i); - for (final MapEntryNode innerListMapEntry : innerListMapEntries) { - final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor1 = tx1.createCursor( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME)))); - cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry); - cursor1.close(); - futures.add(tx1.commit()); - } - } - - futures.get(futures.size() - 1).get(); - - final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); - doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); - - leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(INNER_LIST_ID), - true, Collections.emptyList()); - - verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(), - captorForSubtrees.capture()); - verifyNoMoreInteractions(mockedDataTreeListener); - final List> capturedValue = captorForChanges.getAllValues(); - - final NormalizedNode expected = - ImmutableMapNodeBuilder - .create() - .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) - // only the values from the last run should be present - .withValue(createInnerListMapEntries(1000, "run-999")) - .build(); - - assertEquals("List values dont match the expected values from the last run", - expected, capturedValue.get(0).iterator().next().getRootNode().getDataAfter().get()); - - } - - // top level shard at TEST element, with subshards on each outer-list map entry - @Test - @Ignore - public void testMultipleShardLevels() throws Exception { - initEmptyDatastores(); - - final DistributedShardRegistration testShardReg = waitOnAsyncTask( - leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - final ArrayList registrations = new ArrayList<>(); - final int listSize = 5; - for (int i = 0; i < listSize; i++) { - final YangInstanceIdentifier entryYID = getOuterListIdFor(i); - final CompletionStage future = leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, entryYID), SINGLE_MEMBER); - - registrations.add(waitOnAsyncTask(future, DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION)); - } - - final DOMDataTreeIdentifier rootId = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singletonList( - rootId)); - - DOMDataTreeCursorAwareTransaction transaction = producer.createTransaction(false); - - DOMDataTreeWriteCursor cursor = transaction.createCursor(rootId); - assertNotNull(cursor); - - final MapNode outerList = - ImmutableMapNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build(); - - final ContainerNode testNode = - ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(outerList) - .build(); - - cursor.write(testNode.getIdentifier(), testNode); - - cursor.close(); - transaction.commit().get(); - - final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); - doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); - - final MapNode wholeList = ImmutableMapNodeBuilder.create(outerList) - .withValue(createOuterEntries(listSize, "testing-values")).build(); - - transaction = producer.createTransaction(false); - cursor = transaction.createCursor(TEST_ID); - assertNotNull(cursor); - - cursor.write(wholeList.getIdentifier(), wholeList); - cursor.close(); - - transaction.commit().get(); - - leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID), - true, Collections.emptyList()); - - verify(mockedDataTreeListener, timeout(35000).atLeast(2)).onDataTreeChanged(captorForChanges.capture(), - captorForSubtrees.capture()); - verifyNoMoreInteractions(mockedDataTreeListener); - final List>> allSubtrees = captorForSubtrees.getAllValues(); - - final Map> lastSubtree = allSubtrees.get(allSubtrees.size() - 1); - - final NormalizedNode actual = lastSubtree.get(TEST_ID); - assertNotNull(actual); - - final NormalizedNode expected = - ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableMapNodeBuilder.create(outerList) - .withValue(createOuterEntries(listSize, "testing-values")).build()) - .build(); - - - for (final DistributedShardRegistration registration : registrations) { - waitOnAsyncTask(registration.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - } - - waitOnAsyncTask(testShardReg.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - assertEquals(expected, actual); - } - - @Test - public void testMultipleRegistrationsAtOnePrefix() throws Exception { - initEmptyDatastores(); - - for (int i = 0; i < 10; i++) { - LOG.debug("Round {}", i); - final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - waitUntilShardIsDown(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - } - } - - @Test - public void testCDSDataTreeProducer() throws Exception { - initEmptyDatastores(); - - final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorUtils(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - - - final DOMDataTreeIdentifier configRoot = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot)); - - assertTrue(producer instanceof CDSDataTreeProducer); - - final CDSDataTreeProducer cdsProducer = (CDSDataTreeProducer) producer; - CDSShardAccess shardAccess = cdsProducer.getShardAccess(TEST_ID); - assertEquals(shardAccess.getShardIdentifier(), TEST_ID); - - shardAccess = cdsProducer.getShardAccess(INNER_LIST_ID); - assertEquals(TEST_ID, shardAccess.getShardIdentifier()); - - shardAccess = cdsProducer.getShardAccess(configRoot); - assertEquals(configRoot, shardAccess.getShardIdentifier()); - - waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - } - - private static Collection createOuterEntries(final int amount, final String valuePrefix) { - final Collection ret = new ArrayList<>(); - for (int i = 0; i < amount; i++) { - ret.add(ImmutableNodes.mapEntryBuilder() - .withNodeIdentifier(NodeIdentifierWithPredicates.of(TestModel.OUTER_LIST_QNAME, - QName.create(TestModel.OUTER_LIST_QNAME, "id"), i)) - .withChild(ImmutableNodes - .leafNode(QName.create(TestModel.OUTER_LIST_QNAME, "id"), i)) - .withChild(createWholeInnerList(amount, "outer id: " + i + " " + valuePrefix)) - .build()); - } - - return ret; - } - - private static MapNode createWholeInnerList(final int amount, final String valuePrefix) { - return ImmutableMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) - .withValue(createInnerListMapEntries(amount, valuePrefix)).build(); - } - - private static Collection createInnerListMapEntries(final int amount, final String valuePrefix) { - final Collection ret = new ArrayList<>(); - for (int i = 0; i < amount; i++) { - ret.add(ImmutableNodes.mapEntryBuilder() - .withNodeIdentifier(NodeIdentifierWithPredicates.of(TestModel.INNER_LIST_QNAME, - QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i))) - .withChild(ImmutableNodes - .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i)) - .build()); - } - - return ret; - } - - private static YangInstanceIdentifier getOuterListIdFor(final int id) { - return TestModel.OUTER_LIST_PATH.node(NodeIdentifierWithPredicates.of( - TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id)); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java deleted file mode 100644 index 74b721c87b..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static akka.actor.ActorRef.noSender; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.testkit.javadsl.TestKit; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; -import org.opendaylight.controller.cluster.dom.api.LeaderLocation; -import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; -import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; - -@Deprecated(forRemoval = true) -public class RoleChangeListenerActorTest extends AbstractActorTest { - - @Test - public void testRegisterRoleChangeListenerOnStart() { - final TestKit testKit = new TestKit(getSystem()); - final LeaderLocationListener listener = mock(LeaderLocationListener.class); - final Props props = RoleChangeListenerActor.props(testKit.getRef(), listener); - - getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart"); - testKit.expectMsgClass(RegisterRoleChangeListener.class); - } - - @Test - public void testOnDataTreeChanged() { - final LeaderLocationListener listener = mock(LeaderLocationListener.class); - doNothing().when(listener).onLeaderLocationChanged(any()); - final Props props = RoleChangeListenerActor.props(getSystem().deadLetters(), listener); - - final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedChanged"); - - subject.tell(new LeaderStateChanged("member-1", null, (short) 0), noSender()); - verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.UNKNOWN)); - - subject.tell(new LeaderStateChanged("member-1", "member-1", (short) 0), noSender()); - verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); - - subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender()); - verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE)); - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index e7880ac1bf..abd725ebac 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -7,19 +7,14 @@ */ package org.opendaylight.controller.clustering.it.provider; -import static akka.actor.ActorRef.noSender; - import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import com.google.common.base.Strings; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -27,22 +22,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.ActorSystemProvider; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; -import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; -import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener; import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener; -import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler; -import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler; -import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; @@ -57,9 +43,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; import org.opendaylight.mdsal.dom.api.DOMSchemaService; @@ -110,7 +94,6 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder; @@ -133,7 +116,6 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder; @@ -158,9 +140,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final RpcProviderService rpcRegistry; private final ObjectRegistration registration; - private final DistributedShardFactory distributedShardFactory; private final DistributedDataStoreInterface configDataStore; - private final DOMDataTreeService domDataTreeService; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; private final DOMDataBroker domDataBroker; private final NotificationPublishService notificationPublishService; @@ -168,8 +148,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final DOMSchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; - private final PrefixLeaderHandler prefixLeaderHandler; - private final PrefixShardHandler prefixShardHandler; private final DOMDataTreeChangeService domDataTreeChangeService; private final ActorSystem actorSystem; @@ -184,10 +162,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private ListenerRegistration dtclReg; private IdIntsListener idIntsListener; private final Map publishNotificationsTasks = new HashMap<>(); - private ListenerRegistration ddtlReg; - private IdIntsDOMDataTreeLIstener idIntsDdtl; - - public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry, final DOMRpcProviderService domRpcService, @@ -197,8 +171,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService final NotificationPublishService notificationPublishService, final NotificationService notificationService, final DOMDataBroker domDataBroker, - final DOMDataTreeService domDataTreeService, - final DistributedShardFactory distributedShardFactory, final DistributedDataStoreInterface configDataStore, final ActorSystemProvider actorSystemProvider) { this.rpcRegistry = rpcRegistry; @@ -209,18 +181,12 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService this.notificationPublishService = notificationPublishService; this.notificationService = notificationService; this.domDataBroker = domDataBroker; - this.domDataTreeService = domDataTreeService; - this.distributedShardFactory = distributedShardFactory; this.configDataStore = configDataStore; this.actorSystem = actorSystemProvider.getActorSystem(); - this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer); domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this); - - prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService, - bindingNormalizedNodeSerializer); } @Override @@ -313,17 +279,13 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public ListenableFuture> removePrefixShard(final RemovePrefixShardInput input) { - LOG.info("In removePrefixShard - input: {}", input); - - return prefixShardHandler.onRemovePrefixShard(input); + throw new UnsupportedOperationException(); } @Override public ListenableFuture> becomePrefixLeader( final BecomePrefixLeaderInput input) { - LOG.info("n becomePrefixLeader - input: {}", input); - - return prefixLeaderHandler.makeLeaderLocal(input); + throw new UnsupportedOperationException(); } @Override @@ -400,32 +362,12 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public ListenableFuture> addShardReplica(final AddShardReplicaInput input) { - return null; + throw new UnsupportedOperationException(); } @Override public ListenableFuture> subscribeDdtl(final SubscribeDdtlInput input) { - LOG.info("In subscribeDdtl"); - - if (ddtlReg != null) { - return RpcResultBuilder.failed().withError(ErrorType.RPC, - "data-exists", "There is already a listener registered for id-ints").buildFuture(); - } - - idIntsDdtl = new IdIntsDOMDataTreeLIstener(); - - try { - ddtlReg = domDataTreeService.registerListener(idIntsDdtl, - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - ProduceTransactionsHandler.ID_INT_YID)), - true, Collections.emptyList()); - } catch (DOMDataTreeLoopException e) { - LOG.error("Failed to register DOMDataTreeListener", e); - return RpcResultBuilder.failed().withError( - ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture(); - } - - return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture(); + throw new UnsupportedOperationException(); } @Override @@ -524,9 +466,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public ListenableFuture> createPrefixShard(final CreatePrefixShardInput input) { - LOG.info("In createPrefixShard - input: {}", input); - - return prefixShardHandler.onCreatePrefixShard(input); + throw new UnsupportedOperationException(); } @Override @@ -581,8 +521,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public ListenableFuture> produceTransactions( final ProduceTransactionsInput input) { - LOG.info("In produceTransactions - input: {}", input); - return ProduceTransactionsHandler.start(domDataTreeService, input); + throw new UnsupportedOperationException(); } @Override @@ -675,80 +614,12 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public ListenableFuture> unregisterDefaultConstant( final UnregisterDefaultConstantInput input) { - return null; + throw new UnsupportedOperationException(); } @Override @SuppressWarnings("checkstyle:IllegalCatch") public ListenableFuture> unsubscribeDdtl(final UnsubscribeDdtlInput input) { - LOG.info("In unsubscribeDdtl"); - - if (idIntsDdtl == null || ddtlReg == null) { - return RpcResultBuilder.failed().withError( - ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture(); - } - - long timeout = 120L; - try { - idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Unable to finish notification processing", e); - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, - "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture(); - } - - ddtlReg.close(); - ddtlReg = null; - - if (!idIntsDdtl.hasTriggered()) { - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, - "No notification received.", "id-ints listener has not received any notifications").buildFuture(); - } - - final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID); - LOG.debug("Creating distributed datastore client for shard {}", shardName); - - final ActorUtils actorUtils = configDataStore.getActorUtils(); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(), - "Shard-" + shardName, actorUtils, shardName); - - final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); - final DataStoreClient distributedDataStoreClient; - try { - distributedDataStoreClient = SimpleDataStoreClientActor - .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (RuntimeException e) { - LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); - clientActor.tell(PoisonPill.getInstance(), noSender()); - return RpcResultBuilder.failed() - .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture(); - } - - final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory(); - final ClientTransaction tx = localHistory.createTransaction(); - final ListenableFuture>> read = - tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT)); - - tx.abort(); - localHistory.close(); - try { - final java.util.Optional> optional = read.get(); - if (!optional.isPresent()) { - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, - "data-missing", "Final read from id-ints is empty").buildFuture(); - } - - return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches( - idIntsDdtl.checkEqual(optional.get()))).buildFuture(); - - } catch (InterruptedException | ExecutionException e) { - LOG.error("Unable to read data to verify ddtl data", e); - return RpcResultBuilder.failed() - .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture(); - } finally { - distributedDataStoreClient.close(); - clientActor.tell(PoisonPill.getInstance(), noSender()); - } + throw new UnsupportedOperationException(); } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java deleted file mode 100644 index 3c88f99175..0000000000 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.clustering.it.provider.impl; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class IdIntsDOMDataTreeLIstener implements DOMDataTreeListener { - - private static final Logger LOG = LoggerFactory.getLogger(IdIntsDOMDataTreeLIstener.class); - private static final long SECOND_AS_NANO = 1000000000; - - private NormalizedNode localCopy = null; - private final AtomicLong lastNotifTimestamp = new AtomicLong(0); - private ScheduledFuture scheduledFuture; - private ScheduledExecutorService executorService; - - @Override - public void onDataTreeChanged(final Collection changes, - final Map> subtrees) { - - // There should only be one candidate reported - Preconditions.checkState(changes.size() == 1); - - lastNotifTimestamp.set(System.nanoTime()); - - // do not log the change into debug, only use trace since it will lead to OOM on default heap settings - LOG.debug("Received data tree changed"); - - changes.forEach(change -> { - if (change.getRootNode().getDataAfter().isPresent()) { - LOG.trace("Received change, data before: {}, data after: {}", - change.getRootNode().getDataBefore().isPresent() - ? change.getRootNode().getDataBefore().get() : "", - change.getRootNode().getDataAfter().get()); - - if (localCopy == null || checkEqual(change.getRootNode().getDataBefore().get())) { - localCopy = change.getRootNode().getDataAfter().get(); - } else { - LOG.warn("Ignoring notification."); - LOG.trace("Ignored notification content: {}", change); - } - } else { - LOG.warn("getDataAfter() is missing from notification. change: {}", change); - } - }); - } - - @Override - public void onDataTreeFailed(final Collection causes) { - - } - - public boolean hasTriggered() { - return localCopy != null; - } - - public Future tryFinishProcessing() { - executorService = Executors.newSingleThreadScheduledExecutor(); - final SettableFuture settableFuture = SettableFuture.create(); - - scheduledFuture = executorService.scheduleAtFixedRate(new CheckFinishedTask(settableFuture), - 0, 1, TimeUnit.SECONDS); - return settableFuture; - } - - public boolean checkEqual(final NormalizedNode expected) { - return localCopy.equals(expected); - } - - private class CheckFinishedTask implements Runnable { - - private final SettableFuture future; - - CheckFinishedTask(final SettableFuture future) { - this.future = future; - } - - @Override - public void run() { - if (System.nanoTime() - lastNotifTimestamp.get() > SECOND_AS_NANO * 4) { - scheduledFuture.cancel(false); - future.set(null); - - executorService.shutdown(); - } - } - } -} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixLeaderHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixLeaderHandler.java deleted file mode 100644 index d808b376be..0000000000 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixLeaderHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.clustering.it.provider.impl; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import java.util.Collections; -import java.util.concurrent.CompletionStage; -import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; -import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; -import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class PrefixLeaderHandler { - - private static final Logger LOG = LoggerFactory.getLogger(PrefixLeaderHandler.class); - - private final DOMDataTreeService domDataTreeService; - private final BindingNormalizedNodeSerializer serializer; - - public PrefixLeaderHandler(final DOMDataTreeService domDataTreeService, - final BindingNormalizedNodeSerializer serializer) { - this.domDataTreeService = domDataTreeService; - this.serializer = serializer; - } - - public ListenableFuture> makeLeaderLocal(final BecomePrefixLeaderInput input) { - - final YangInstanceIdentifier yid = serializer.toYangInstanceIdentifier(input.getPrefix()); - final DOMDataTreeIdentifier prefix = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, yid); - - try (CDSDataTreeProducer producer = - (CDSDataTreeProducer) domDataTreeService.createProducer(Collections.singleton(prefix))) { - - final CDSShardAccess shardAccess = producer.getShardAccess(prefix); - - final CompletionStage completionStage = shardAccess.makeLeaderLocal(); - - completionStage.exceptionally(throwable -> { - LOG.error("Leader movement failed.", throwable); - return null; - }); - } catch (final DOMDataTreeProducerException e) { - LOG.warn("Error while closing producer", e); - } catch (final TimeoutException e) { - LOG.warn("Timeout while on producer operation", e); - Futures.immediateFuture(RpcResultBuilder.failed().withError(RpcError.ErrorType.RPC, - "resource-denied-transport", "Timeout while opening producer please retry.", "clustering-it", - "clustering-it", e)); - } - - return Futures.immediateFuture(RpcResultBuilder.success(new BecomePrefixLeaderOutputBuilder().build()).build()); - } -} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java deleted file mode 100644 index 58666ecd0a..0000000000 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.clustering.it.provider.impl; - -import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID; -import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT; -import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS; -import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletionStage; -import java.util.stream.Collectors; -import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; -import org.opendaylight.controller.cluster.sharding.DistributedShardRegistration; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutputBuilder; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class PrefixShardHandler { - - private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class); - private static final int MAX_PREFIX = 4; - private static final String PREFIX_TEMPLATE = "prefix-"; - - private final DistributedShardFactory shardFactory; - private final DOMDataTreeService domDataTreeService; - private final BindingNormalizedNodeSerializer serializer; - - private final Map registrations = - Collections.synchronizedMap(new HashMap<>()); - - public PrefixShardHandler(final DistributedShardFactory shardFactory, - final DOMDataTreeService domDataTreeService, - final BindingNormalizedNodeSerializer serializer) { - - this.shardFactory = shardFactory; - this.domDataTreeService = domDataTreeService; - this.serializer = serializer; - } - - public ListenableFuture> onCreatePrefixShard( - final CreatePrefixShardInput input) { - - final SettableFuture> future = SettableFuture.create(); - - final CompletionStage completionStage; - final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix()); - - try { - completionStage = shardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, identifier), - input.getReplicas().stream().map(MemberName::forName).collect(Collectors.toList())); - - completionStage.thenAccept(registration -> { - LOG.debug("Shard[{}] created successfully.", identifier); - registrations.put(identifier, registration); - - final ListenableFuture ensureFuture = ensureListExists(); - Futures.addCallback(ensureFuture, new FutureCallback() { - @Override - public void onSuccess(final Object result) { - LOG.debug("Initial list write successful."); - future.set(RpcResultBuilder.success(new CreatePrefixShardOutputBuilder().build()).build()); - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.warn("Shard[{}] creation failed:", identifier, throwable); - - final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, - "create-shard-failed", "Shard creation failed", "cluster-test-app", "", throwable); - future.set(RpcResultBuilder.failed().withRpcError(error).build()); - } - }, MoreExecutors.directExecutor()); - }); - completionStage.exceptionally(throwable -> { - LOG.warn("Shard[{}] creation failed:", identifier, throwable); - - final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", - "Shard creation failed", "cluster-test-app", "", throwable); - future.set(RpcResultBuilder.failed().withRpcError(error).build()); - return null; - }); - } catch (final DOMDataTreeShardingConflictException e) { - LOG.warn("Unable to register shard for: {}.", identifier); - - final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", - "Sharding conflict", "cluster-test-app", "", e); - future.set(RpcResultBuilder.failed().withRpcError(error).build()); - } - - return future; - } - - public ListenableFuture> onRemovePrefixShard( - final RemovePrefixShardInput input) { - - final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix()); - final DistributedShardRegistration registration = registrations.get(identifier); - - if (registration == null) { - final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "registration-missing", - "No shard registered at this prefix."); - return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error) - .build()); - } - - final SettableFuture> future = SettableFuture.create(); - - final CompletionStage close = registration.close(); - close.thenRun(() -> future.set(RpcResultBuilder.success(new RemovePrefixShardOutputBuilder().build()).build())); - close.exceptionally(throwable -> { - LOG.warn("Shard[{}] removal failed:", identifier, throwable); - - final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "remove-shard-failed", - "Shard removal failed", "cluster-test-app", "", throwable); - future.set(RpcResultBuilder.failed().withRpcError(error).build()); - return null; - }); - - return future; - } - - private ListenableFuture ensureListExists() { - - final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ID_INT); - - // hardcoded initial list population for parallel produce-transactions testing on multiple nodes - for (int i = 1; i < MAX_PREFIX; i++) { - mapBuilder.withChild( - ImmutableNodes.mapEntryBuilder(ID_INT, ID, PREFIX_TEMPLATE + i) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build()); - } - final MapNode mapNode = mapBuilder.build(); - - final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(ID_INTS)) - .withChild(mapNode) - .build(); - - final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()))); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); - - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier( - LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty())); - - cursor.merge(containerNode.getIdentifier(), containerNode); - cursor.close(); - - final ListenableFuture future = tx.commit(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Object result) { - try { - LOG.debug("Closing producer for initial list."); - producer.close(); - } catch (DOMDataTreeProducerException e) { - LOG.warn("Error while closing producer.", e); - } - } - - @Override - public void onFailure(final Throwable throwable) { - //NOOP handled by the caller of this method. - } - }, MoreExecutors.directExecutor()); - return future; - } -} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java deleted file mode 100644 index fc1c9f5545..0000000000 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.clustering.it.provider.impl; - -import static java.util.Objects.requireNonNull; - -import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public final class ProduceTransactionsHandler extends AbstractTransactionHandler { - private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); - - private final SettableFuture> future = SettableFuture.create(); - private final SplittableRandom random = new SplittableRandom(); - private final Set usedValues = new HashSet<>(); - private final DOMDataTreeIdentifier idListItem; - private final DOMDataTreeProducer itemProducer; - - private long insertTx = 0; - private long deleteTx = 0; - - private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem, - final ProduceTransactionsInput input) { - super(input); - this.itemProducer = requireNonNull(producer); - this.idListItem = requireNonNull(idListItem); - } - - public static ListenableFuture> start( - final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { - final String id = input.getId(); - LOG.debug("Filling the item list {} with initial values.", id); - - final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(NodeIdentifierWithPredicates.of(ID_INT, ID, id)); - - final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer( - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); - - final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)); - - final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build(); - cursor.write(list.getIdentifier(), list); - cursor.close(); - - try { - tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to fill the initial item list.", e); - closeProducer(itemProducer); - - return Futures.immediateFuture(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - } - - final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer, - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier()) - .toOptimized()), input); - // It is handler's responsibility to close itemProducer when the work is finished. - handler.doStart(); - return handler.future; - } - - private static void closeProducer(final DOMDataTreeProducer producer) { - try { - producer.close(); - } catch (final DOMDataTreeProducerException exception) { - LOG.warn("Failure while closing producer.", exception); - } - } - - @Override - FluentFuture execWrite(final long txId) { - final int i = random.nextInt(MAX_ITEM + 1); - final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); - - final NodeIdentifierWithPredicates entryId = NodeIdentifierWithPredicates.of(ITEM, NUMBER, i); - if (usedValues.contains(i)) { - LOG.debug("Deleting item: {}", i); - deleteTx++; - cursor.delete(entryId); - usedValues.remove(i); - - } else { - LOG.debug("Inserting item: {}", i); - insertTx++; - - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId) - .withChild(ImmutableNodes.leafNode(NUMBER, i)).build(); - cursor.write(entryId, entry); - usedValues.add(i); - } - - cursor.close(); - - return tx.commit(); - } - - @Override - void runFailed(final Throwable cause, final long txId) { - closeProducer(itemProducer); - future.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Commit failed for tx # " + txId, cause).build()); - } - - @Override - void runSuccessful(final long allTx) { - closeProducer(itemProducer); - final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - future.set(RpcResultBuilder.success() - .withResult(output).build()); - } - - @Override - void runTimedOut(final String cause) { - closeProducer(itemProducer); - future.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, cause).build()); - } -} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/OSGI-INF/blueprint/cluster-test-app.xml b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/OSGI-INF/blueprint/cluster-test-app.xml index eccae7a7d1..aa7d4032a8 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/OSGI-INF/blueprint/cluster-test-app.xml +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/OSGI-INF/blueprint/cluster-test-app.xml @@ -14,8 +14,6 @@ - - @@ -62,8 +60,6 @@ - - -- 2.36.6