From 5a0edd493bafc365647bc6311b4b7da86a78645d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 25 Nov 2016 13:55:13 +0100 Subject: [PATCH] BUG-5280: split DistributedDataStore Split the DistributedDataStore into two components into an abstract base class and concretization running with TransactionProxies. Add another concretization, which uses DataStoreClient to instantiate requests. Change-Id: I454eec76d54c2fd4e4ea1e5cd16d12398eec81f0 Signed-off-by: Robert Varga --- .../admin/ClusterAdminRpcServiceTest.java | 26 +- .../databroker/ClientBackedDataStore.java | 59 ++++ .../ClientBackedReadTransaction.java | 60 ++++ .../ClientBackedReadWriteTransaction.java | 40 +++ .../databroker/ClientBackedTransaction.java | 74 +++++ .../ClientBackedTransactionChain.java | 93 ++++++ .../ClientBackedWriteTransaction.java | 46 +++ ...DOMStoreThreePhaseCommitCohortAdaptor.java | 53 ++++ .../ShardedDOMStoreReadTransaction.java | 45 --- .../ShardedDOMStoreReadWriteTransaction.java | 47 --- .../ShardedDOMStoreTransactionChain.java | 50 --- .../ShardedDOMStoreWriteTransaction.java | 51 --- .../cluster/datastore/AbstractDataStore.java | 290 ++++++++++++++++++ .../cluster/datastore/DatastoreContext.java | 11 + .../datastore/DistributedDataStore.java | 262 +--------------- .../DistributedDataStoreFactory.java | 15 +- ...tributedConfigDataStoreProviderModule.java | 1 + ...tedOperationalDataStoreProviderModule.java | 1 + .../yang/distributed-datastore-provider.yang | 143 +++++---- .../DataTreeCohortIntegrationTest.java | 6 +- .../DistributedDataStoreIntegrationTest.java | 44 +-- ...butedDataStoreRemotingIntegrationTest.java | 30 +- .../cluster/datastore/IntegrationTestKit.java | 43 ++- .../cluster/datastore/MemberNode.java | 40 +-- ...ributedEntityOwnershipIntegrationTest.java | 24 +- ...DistributedEntityOwnershipServiceTest.java | 3 +- 26 files changed, 930 insertions(+), 627 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 07acc18dc5..113c9faf35 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -47,8 +47,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; @@ -246,8 +246,8 @@ public class ClusterAdminRpcServiceTest { verifyFailedRpcResult(rpcResult); } - private static NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, - DistributedDataStore readFromStore) throws Exception { + private static NormalizedNode writeCarsNodeAndVerify(AbstractDataStore writeToStore, + AbstractDataStore readFromStore) throws Exception { DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); NormalizedNode carsNode = CarsModel.create(); writeTx.write(CarsModel.BASE_PATH, carsNode); @@ -262,7 +262,7 @@ public class ClusterAdminRpcServiceTest { return carsNode; } - private static void readCarsNodeAndVerify(DistributedDataStore readFromStore, + private static void readCarsNodeAndVerify(AbstractDataStore readFromStore, NormalizedNode expCarsNode) throws Exception { Optional> optional = readFromStore.newReadOnlyTransaction() .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS); @@ -636,7 +636,7 @@ public class ClusterAdminRpcServiceTest { successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), replicaNode2.configDataStore(), replicaNode2.operDataStore(), replicaNode3.configDataStore(), replicaNode3.operDataStore()}, new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), @@ -685,7 +685,7 @@ public class ClusterAdminRpcServiceTest { successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), replicaNode2.configDataStore(), replicaNode2.operDataStore(), replicaNode3.configDataStore(), replicaNode3.operDataStore()}, new String[]{"cars", "people"}, @@ -714,7 +714,7 @@ public class ClusterAdminRpcServiceTest { successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), replicaNode2.configDataStore(), replicaNode2.operDataStore(), replicaNode3.configDataStore(), replicaNode3.operDataStore()}, new String[]{"cars", "people"}, @@ -779,7 +779,7 @@ public class ClusterAdminRpcServiceTest { successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(), + verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(), replicaNode2.configDataStore(), replicaNode2.operDataStore(), replicaNode3.configDataStore(), replicaNode3.operDataStore()}, new String[]{"cars", "people"}, @@ -847,7 +847,7 @@ public class ClusterAdminRpcServiceTest { successShardResult("people", DataStoreType.Operational)); // Members 2 and 3 are now non-voting but should get replicated with the new new server config. - verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), replicaNode2.configDataStore(), replicaNode2.operDataStore(), replicaNode3.configDataStore(), replicaNode3.operDataStore()}, new String[]{"cars", "people"}, @@ -863,7 +863,7 @@ public class ClusterAdminRpcServiceTest { }); } - private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, + private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, String member, String datastoreTypeSuffix, String... shards) { String[] datastoreTypes = {"config_", "oper_"}; for (String type : datastoreTypes) { @@ -884,9 +884,9 @@ public class ClusterAdminRpcServiceTest { } @SafeVarargs - private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards, + private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards, SimpleEntry... expStates) throws Exception { - for (DistributedDataStore datastore: datastores) { + for (AbstractDataStore datastore: datastores) { for (String shard: shards) { verifyVotingStates(datastore, shard, expStates); } @@ -894,7 +894,7 @@ public class ClusterAdminRpcServiceTest { } @SafeVarargs - private static void verifyVotingStates(DistributedDataStore datastore, String shardName, + private static void verifyVotingStates(AbstractDataStore datastore, String shardName, SimpleEntry... expStates) throws Exception { String localMemberName = datastore.getActorContext().getCurrentMemberName().getName(); Map expStateMap = new HashMap<>(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java new file mode 100644 index 0000000000..7d16966f04 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import akka.actor.ActorSystem; +import com.google.common.annotations.VisibleForTesting; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; +import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; + +/** + * Implements a distributed DOMStore using ClientActor. + */ +public class ClientBackedDataStore extends AbstractDataStore { + + public ClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, + final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, + final DatastoreSnapshot restoreFromSnapshot) { + super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot); + } + + @VisibleForTesting + ClientBackedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) { + super(actorContext, identifier); + } + + @Override + public DOMStoreTransactionChain createTransactionChain() { + return new ClientBackedTransactionChain(getClient().createLocalHistory()); + } + + @Override + public DOMStoreReadTransaction newReadOnlyTransaction() { + return new ClientBackedReadTransaction(getClient().createSnapshot(), null); + } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction() { + return new ClientBackedWriteTransaction(getClient().createTransaction()); + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction() { + return new ClientBackedReadWriteTransaction(getClient().createTransaction()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java new file mode 100644 index 0000000000..3d10c39760 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * An implementation of {@link DOMStoreReadTransaction} backed by a {@link ClientSnapshot}. Used for standalone + * transactions. + * + * @author Robert Varga + */ +final class ClientBackedReadTransaction extends ClientBackedTransaction + implements DOMStoreReadTransaction { + private static final AtomicReferenceFieldUpdater + PARENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ClientBackedReadTransaction.class, + ClientBackedTransactionChain.class, "parent"); + + @SuppressWarnings("unused") + private volatile ClientBackedTransactionChain parent; + + ClientBackedReadTransaction(final ClientSnapshot delegate, @Nullable final ClientBackedTransactionChain parent) { + super(delegate); + this.parent = parent; + } + + @Override + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER); + } + + @Override + public CheckedFuture exists(final YangInstanceIdentifier path) { + return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER); + } + + @Override + public void close() { + super.close(); + + final ClientBackedTransactionChain local = PARENT_UPDATER.getAndSet(this, null); + if (local != null) { + local.snapshotClosed(delegate()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java new file mode 100644 index 0000000000..a1bbe61895 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * An implementation of {@link DOMStoreReadWriteTransaction} backed by a {@link ClientTransaction}. + * + * @author Robert Varga + */ +final class ClientBackedReadWriteTransaction extends ClientBackedWriteTransaction + implements DOMStoreReadWriteTransaction { + + ClientBackedReadWriteTransaction(final ClientTransaction delegate) { + super(delegate); + } + + @Override + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER); + } + + @Override + public CheckedFuture exists(final YangInstanceIdentifier path) { + return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java new file mode 100644 index 0000000000..6c14297739 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.FinalizablePhantomReference; +import com.google.common.base.FinalizableReferenceQueue; +import com.google.common.base.Preconditions; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link DOMStoreTransaction} backed by a {@link ClientTransaction}. It guards against user-level + * leaks by maintaining a phantom reference on the backing transaction, which will ensure that the transaction will + * be aborted, if it is not otherwise closed, just before this object is garbage-collected. + * + * @author Robert Varga + */ +abstract class ClientBackedTransaction> extends + AbstractDOMStoreTransaction { + private static final class Finalizer extends FinalizablePhantomReference> { + private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue(); + private static final Set FINALIZERS = ConcurrentHashMap.newKeySet(); + private static final Logger LOG = LoggerFactory.getLogger(Finalizer.class); + + private final AbstractClientHandle transaction; + + private Finalizer(final ClientBackedTransaction referent, final AbstractClientHandle transaction) { + super(referent, QUEUE); + this.transaction = Preconditions.checkNotNull(transaction); + } + + static @Nonnull > T recordTransaction( + @Nonnull final ClientBackedTransaction referent, @Nonnull final T transaction) { + FINALIZERS.add(new Finalizer(referent, transaction)); + return transaction; + } + + @Override + public void finalizeReferent() { + FINALIZERS.remove(this); + if (transaction.abort()) { + LOG.warn("Aborted orphan transaction {}", transaction.getIdentifier()); + } + } + } + + private final T delegate; + + ClientBackedTransaction(final T delegate) { + super(delegate.getIdentifier()); + this.delegate = Finalizer.recordTransaction(this, delegate); + } + + final T delegate() { + return delegate; + } + + @Override + public void close() { + delegate.abort(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java new file mode 100644 index 0000000000..db30372676 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.WeakHashMap; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. + * + * @author Robert Varga + */ +final class ClientBackedTransactionChain implements DOMStoreTransactionChain { + private static final Logger LOG = LoggerFactory.getLogger(ClientBackedTransactionChain.class); + + @GuardedBy("this") + private final Map, Boolean> openSnapshots = new WeakHashMap<>(); + + private final ClientLocalHistory history; + + ClientBackedTransactionChain(final ClientLocalHistory history) { + this.history = Preconditions.checkNotNull(history); + } + + @Override + public DOMStoreReadTransaction newReadOnlyTransaction() { + return new ClientBackedReadTransaction(createSnapshot(), this); + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction() { + return new ClientBackedReadWriteTransaction(createTransaction()); + } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction() { + return new ClientBackedWriteTransaction(createTransaction()); + } + + @Override + public synchronized void close() { + for (AbstractClientHandle snap : openSnapshots.keySet()) { + LOG.warn("Aborting unclosed transaction {}", snap.getIdentifier()); + snap.abort(); + } + openSnapshots.clear(); + + history.close(); + } + + synchronized void snapshotClosed(final ClientSnapshot clientTransaction) { + openSnapshots.remove(clientTransaction); + } + + private ClientSnapshot createSnapshot() { + try { + return recordSnapshot(history.takeSnapshot()); + } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) { + throw new TransactionChainClosedException("Transaction chain has been closed", e); + } + } + + private ClientTransaction createTransaction() { + try { + return recordSnapshot(history.createTransaction()); + } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) { + throw new TransactionChainClosedException("Transaction chain has been closed", e); + } + } + + private synchronized > T recordSnapshot(final T snapshot) { + openSnapshots.put(snapshot, Boolean.TRUE); + return snapshot; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java new file mode 100644 index 0000000000..27a671d3c3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * An implementation of {@link DOMStoreWriteTransaction} backed by a {@link ClientTransaction}. + * + * @author Robert Varga + */ +class ClientBackedWriteTransaction extends ClientBackedTransaction + implements DOMStoreWriteTransaction { + ClientBackedWriteTransaction(final ClientTransaction delegate) { + super(delegate); + } + + @Override + public final void write(final YangInstanceIdentifier path, final NormalizedNode data) { + delegate().write(path, data); + } + + @Override + public final void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + delegate().merge(path, data); + } + + @Override + public final void delete(final YangInstanceIdentifier path) { + delegate().delete(path); + } + + @Override + public final DOMStoreThreePhaseCommitCohort ready() { + return new DOMStoreThreePhaseCommitCohortAdaptor(delegate().ready()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java new file mode 100644 index 0000000000..e9a5a7dce5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ForwardingObject; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +/** + * Utility class from bridging {@link DOMStoreThreePhaseCommitCohort} and + * {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}. + * + * @author Robert Varga + */ +final class DOMStoreThreePhaseCommitCohortAdaptor extends ForwardingObject implements DOMStoreThreePhaseCommitCohort { + private final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate; + + DOMStoreThreePhaseCommitCohortAdaptor( + final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate) { + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public ListenableFuture canCommit() { + return delegate.canCommit(); + } + + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + public ListenableFuture commit() { + return delegate.commit(); + } + + @Override + protected Object delegate() { + return delegate; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java deleted file mode 100644 index a52254e35f..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.common.api.ReadFailedException; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -/** - * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing - * {@link ClientTransaction}. This class is not final to allow further subclassing by - * {@link ShardedDOMStoreReadWriteTransaction}. - * - * @author Robert Varga - */ -class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction { - ShardedDOMStoreReadTransaction(final ClientTransaction tx) { - super(tx); - } - - @Override - public final void close() { - transaction().abort(); - } - - @Override - public final CheckedFuture>, ReadFailedException> read( - final YangInstanceIdentifier path) { - return transaction().read(path); - } - - @Override - public final CheckedFuture exists(final YangInstanceIdentifier path) { - return transaction().exists(path); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java deleted file mode 100644 index 9144f9b091..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker; - -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -/** - * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing - * {@link ClientTransaction}. - * - * @author Robert Varga - */ -final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction - implements DOMStoreReadWriteTransaction { - ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) { - super(tx); - } - - @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - transaction().write(path, data); - } - - @Override - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - transaction().merge(path, data); - } - - @Override - public void delete(final YangInstanceIdentifier path) { - transaction().delete(path); - } - - @Override - public DOMStoreThreePhaseCommitCohort ready() { - return transaction().ready(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java deleted file mode 100644 index 6b7237e803..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; - -/** - * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps - * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation. - * - * @author Robert Varga - */ -final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain { - private final ClientLocalHistory history; - - ShardedDOMStoreTransactionChain(final ClientLocalHistory history) { - this.history = Preconditions.checkNotNull(history); - } - - @Override - public DOMStoreReadTransaction newReadOnlyTransaction() { - return new ShardedDOMStoreReadTransaction(history.createTransaction()); - } - - @Override - public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new ShardedDOMStoreReadWriteTransaction(history.createTransaction()); - } - - @Override - public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new ShardedDOMStoreWriteTransaction(history.createTransaction()); - } - - @Override - public void close() { - history.close(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java deleted file mode 100644 index 0e13ac49f3..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker; - -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -/** - * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing - * {@link ClientTransaction}. - * - * @author Robert Varga - */ -final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction { - ShardedDOMStoreWriteTransaction(final ClientTransaction tx) { - super(tx); - } - - @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - transaction().write(path, data); - } - - @Override - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - transaction().merge(path, data); - } - - @Override - public void delete(final YangInstanceIdentifier path) { - transaction().delete(path); - } - - @Override - public DOMStoreThreePhaseCommitCohort ready() { - return transaction().ready(); - } - - @Override - public void close() { - transaction().abort(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java new file mode 100644 index 0000000000..36c822cd0b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base implementation of a distributed DOMStore. + */ +public abstract class AbstractDataStore implements DistributedDataStoreInterface, SchemaContextListener, + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, + DOMDataTreeCommitCohortRegistry, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class); + + private static final long READY_WAIT_FACTOR = 3; + + private final ActorContext actorContext; + private final long waitTillReadyTimeInMillis; + + private AutoCloseable closeable; + + private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean; + + private DatastoreInfoMXBeanImpl datastoreInfoMXBean; + + private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1); + + private final ClientIdentifier identifier; + private final DataStoreClient client; + + @SuppressWarnings("checkstyle:IllegalCatch") + protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, + final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, + final DatastoreSnapshot restoreFromSnapshot) { + Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); + + String shardManagerId = ShardManagerIdentifier.builder() + .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString(); + + LOG.info("Creating ShardManager : {}", shardManagerId); + + String shardDispatcher = + new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + + PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + + ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration) + .datastoreContextFactory(datastoreContextFactory) + .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) + .primaryShardInfoCache(primaryShardInfoCache) + .restoreFromSnapshot(restoreFromSnapshot); + + actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, + shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), + primaryShardInfoCache); + + final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(), + datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext); + final ActorRef clientActor = actorSystem.actorOf(clientProps); + try { + client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Failed to get actor for {}", clientProps, e); + clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + throw Throwables.propagate(e); + } + + identifier = client.getIdentifier(); + LOG.debug("Distributed data store client {} started", identifier); + + this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() + .duration().toMillis() * READY_WAIT_FACTOR; + + datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl( + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); + datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext()); + datastoreConfigMXBean.registerMBean(); + + datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext() + .getDataStoreMXBeanType(), actorContext); + datastoreInfoMXBean.registerMBean(); + } + + @VisibleForTesting + protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier) { + this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.client = null; + this.identifier = Preconditions.checkNotNull(identifier); + this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() + .duration().toMillis() * READY_WAIT_FACTOR; + } + + protected final DataStoreClient getClient() { + return client; + } + + final ClientIdentifier getIdentifier() { + return identifier; + } + + public void setCloseable(final AutoCloseable closeable) { + this.closeable = closeable; + } + + @SuppressWarnings("unchecked") + @Override + public >> + ListenerRegistration registerChangeListener( + final YangInstanceIdentifier path, final L listener, + final AsyncDataBroker.DataChangeScope scope) { + + Preconditions.checkNotNull(path, "path should not be null"); + Preconditions.checkNotNull(listener, "listener should not be null"); + + LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); + + String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); + + final DataChangeListenerRegistrationProxy listenerRegistrationProxy = + new DataChangeListenerRegistrationProxy(shardName, actorContext, listener); + listenerRegistrationProxy.init(path, scope); + + return listenerRegistrationProxy; + } + + @Override + public ListenerRegistration registerTreeChangeListener( + final YangInstanceIdentifier treeId, final L listener) { + Preconditions.checkNotNull(treeId, "treeId should not be null"); + Preconditions.checkNotNull(listener, "listener should not be null"); + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy<>(actorContext, listener); + listenerRegistrationProxy.init(shardName, treeId); + + return listenerRegistrationProxy; + } + + + @Override + public DOMDataTreeCommitCohortRegistration registerCommitCohort( + final DOMDataTreeIdentifier subtree, final C cohort) { + YangInstanceIdentifier treeId = + Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier(); + Preconditions.checkNotNull(cohort, "listener should not be null"); + + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); + LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); + + DataTreeCohortRegistrationProxy cohortProxy = + new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort); + cohortProxy.init(shardName); + return cohortProxy; + } + + @Override + public void onGlobalContextUpdated(final SchemaContext schemaContext) { + actorContext.setSchemaContext(schemaContext); + } + + @Override + public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) { + LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName()); + + actorContext.setDatastoreContext(contextFactory); + datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext()); + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public void close() { + LOG.info("Closing data store {}", identifier); + + if (datastoreConfigMXBean != null) { + datastoreConfigMXBean.unregisterMBean(); + } + if (datastoreInfoMXBean != null) { + datastoreInfoMXBean.unregisterMBean(); + } + + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + LOG.debug("Error closing instance", e); + } + } + + actorContext.shutdown(); + + if (client != null) { + client.close(); + } + } + + @Override + public ActorContext getActorContext() { + return actorContext; + } + + public void waitTillReady() { + LOG.info("Beginning to wait for data store to become ready : {}", identifier); + + try { + if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) { + LOG.debug("Data store {} is now ready", identifier); + } else { + LOG.error("Shard leaders failed to settle in {} seconds, giving up", + TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis)); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for shards to settle", e); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator, + final String shardDispatcher, final String shardManagerId) { + Exception lastException = null; + + for (int i = 0; i < 100; i++) { + try { + return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( + ActorContext.BOUNDED_MAILBOX), shardManagerId); + } catch (Exception e) { + lastException = e; + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying " + + "(retry count = {})", shardManagerId, e.getMessage(), i); + } + } + + throw new IllegalStateException("Failed to create Shard Manager", lastException); + } + + @VisibleForTesting + public CountDownLatch getWaitTillReadyCountDownLatch() { + return waitTillReadyCountDownLatch; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index a81806bc10..e889f0a9b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -76,6 +76,7 @@ public class DatastoreContext { private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; + private boolean useTellBasedProtocol = false; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; @@ -113,6 +114,7 @@ public class DatastoreContext { this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; this.shardManagerPersistenceId = other.shardManagerPersistenceId; + this.useTellBasedProtocol = other.useTellBasedProtocol; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -258,6 +260,10 @@ public class DatastoreContext { return transactionDebugContextEnabled; } + public boolean isUseTellBasedProtocol() { + return useTellBasedProtocol; + } + public int getShardSnapshotChunkSize() { return raftConfig.getSnapshotChunkSize(); } @@ -471,6 +477,11 @@ public class DatastoreContext { return this; } + public Builder useTellBasedProtocol(boolean value) { + datastoreContext.useTellBasedProtocol = value; + return this; + } + /** * For unit tests only. */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 4ae64ac582..47e5509a42 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,202 +8,38 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.Uninterruptibles; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.config.Configuration; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; -import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Implements a distributed DOMStore. + * Implements a distributed DOMStore using Akka Patterns.ask(). */ -public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, - DOMDataTreeCommitCohortRegistry, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); - - private static final long READY_WAIT_FACTOR = 3; - - private final ActorContext actorContext; - private final long waitTillReadyTimeInMillis; - - private AutoCloseable closeable; - - private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean; - - private DatastoreInfoMXBeanImpl datastoreInfoMXBean; - - private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1); - - private final ClientIdentifier identifier; - private final DataStoreClient client; +public class DistributedDataStore extends AbstractDataStore { private final TransactionContextFactory txContextFactory; - @SuppressWarnings("checkstyle:IllegalCatch") public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, final DatastoreSnapshot restoreFromSnapshot) { - Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); - Preconditions.checkNotNull(cluster, "cluster should not be null"); - Preconditions.checkNotNull(configuration, "configuration should not be null"); - Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); - - String shardManagerId = ShardManagerIdentifier.builder() - .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString(); - - LOG.info("Creating ShardManager : {}", shardManagerId); - - String shardDispatcher = - new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - - PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); - - ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration) - .datastoreContextFactory(datastoreContextFactory) - .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) - .primaryShardInfoCache(primaryShardInfoCache) - .restoreFromSnapshot(restoreFromSnapshot); - - actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, - shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), - primaryShardInfoCache); - - final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(), - datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext); - final ActorRef clientActor = actorSystem.actorOf(clientProps); - try { - client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error("Failed to get actor for {}", clientProps, e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - throw Throwables.propagate(e); - } - - identifier = client.getIdentifier(); - LOG.debug("Distributed data store client {} started", identifier); - - this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() - .duration().toMillis() * READY_WAIT_FACTOR; - - this.txContextFactory = new TransactionContextFactory(actorContext, identifier); - - datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl( - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); - datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext()); - datastoreConfigMXBean.registerMBean(); - - datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext() - .getDataStoreMXBeanType(), actorContext); - datastoreInfoMXBean.registerMBean(); + super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot); + this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier()); } @VisibleForTesting DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) { - this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); - this.client = null; - this.identifier = Preconditions.checkNotNull(identifier); - this.txContextFactory = new TransactionContextFactory(actorContext, identifier); - this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() - .duration().toMillis() * READY_WAIT_FACTOR; - } - - public void setCloseable(final AutoCloseable closeable) { - this.closeable = closeable; - } - - @SuppressWarnings("unchecked") - @Override - public >> - ListenerRegistration registerChangeListener( - final YangInstanceIdentifier path, final L listener, - final AsyncDataBroker.DataChangeScope scope) { - - Preconditions.checkNotNull(path, "path should not be null"); - Preconditions.checkNotNull(listener, "listener should not be null"); - - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); - - String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); - - final DataChangeListenerRegistrationProxy listenerRegistrationProxy = - new DataChangeListenerRegistrationProxy(shardName, actorContext, listener); - listenerRegistrationProxy.init(path, scope); - - return listenerRegistrationProxy; - } - - @Override - public ListenerRegistration registerTreeChangeListener( - final YangInstanceIdentifier treeId, final L listener) { - Preconditions.checkNotNull(treeId, "treeId should not be null"); - Preconditions.checkNotNull(listener, "listener should not be null"); - - final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); - LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); - - final DataTreeChangeListenerProxy listenerRegistrationProxy = - new DataTreeChangeListenerProxy<>(actorContext, listener); - listenerRegistrationProxy.init(shardName, treeId); - - return listenerRegistrationProxy; + super(actorContext, identifier); + this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier()); } - @Override - public DOMDataTreeCommitCohortRegistration registerCommitCohort( - final DOMDataTreeIdentifier subtree, final C cohort) { - YangInstanceIdentifier treeId = - Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier(); - Preconditions.checkNotNull(cohort, "listener should not be null"); - - - final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); - LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); - - DataTreeCohortRegistrationProxy cohortProxy = - new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort); - cohortProxy.init(shardName); - return cohortProxy; - } - @Override public DOMStoreTransactionChain createTransactionChain() { return txContextFactory.createTransactionChain(); @@ -216,99 +52,19 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - actorContext.acquireTxCreationPermit(); + getActorContext().acquireTxCreationPermit(); return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - actorContext.acquireTxCreationPermit(); + getActorContext().acquireTxCreationPermit(); return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE); } @Override - public void onGlobalContextUpdated(final SchemaContext schemaContext) { - actorContext.setSchemaContext(schemaContext); - } - - @Override - public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) { - LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName()); - - actorContext.setDatastoreContext(contextFactory); - datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext()); - } - - @Override - @SuppressWarnings("checkstyle:IllegalCatch") public void close() { - LOG.info("Closing data store {}", identifier); - - if (datastoreConfigMXBean != null) { - datastoreConfigMXBean.unregisterMBean(); - } - if (datastoreInfoMXBean != null) { - datastoreInfoMXBean.unregisterMBean(); - } - - if (closeable != null) { - try { - closeable.close(); - } catch (Exception e) { - LOG.debug("Error closing instance", e); - } - } - txContextFactory.close(); - actorContext.shutdown(); - - if (client != null) { - client.close(); - } - } - - @Override - public ActorContext getActorContext() { - return actorContext; - } - - public void waitTillReady() { - LOG.info("Beginning to wait for data store to become ready : {}", identifier); - - try { - if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) { - LOG.debug("Data store {} is now ready", identifier); - } else { - LOG.error("Shard leaders failed to settle in {} seconds, giving up", - TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis)); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for shards to settle", e); - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator, - final String shardDispatcher, final String shardManagerId) { - Exception lastException = null; - - for (int i = 0; i < 100; i++) { - try { - return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( - ActorContext.BOUNDED_MAILBOX), shardManagerId); - } catch (Exception e) { - lastException = e; - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying " - + "(retry count = {})", shardManagerId, e.getMessage(), i); - } - } - - throw new IllegalStateException("Failed to create Shard Manager", lastException); - } - - @VisibleForTesting - public CountDownLatch getWaitTillReadyCountDownLatch() { - return waitTillReadyCountDownLatch; + super.close(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 484a400a57..be6d2d5c4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import org.opendaylight.controller.cluster.ActorSystemProvider; +import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; @@ -20,9 +21,9 @@ import org.slf4j.LoggerFactory; public class DistributedDataStoreFactory { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class); - public static DistributedDataStore createInstance(SchemaService schemaService, - DatastoreContext datastoreContext, DatastoreSnapshotRestore datastoreSnapshotRestore, - ActorSystemProvider actorSystemProvider, BundleContext bundleContext) { + public static AbstractDataStore createInstance(final SchemaService schemaService, + final DatastoreContext datastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore, + final ActorSystemProvider actorSystemProvider, final BundleContext bundleContext) { LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreName()); @@ -34,8 +35,12 @@ public class DistributedDataStoreFactory { introspector, bundleContext); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); - final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, - new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot); + ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem); + DatastoreContextFactory contextFactory = introspector.newContextFactory(); + + final AbstractDataStore dataStore = datastoreContext.isUseTellBasedProtocol() + ? new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot) : + new DistributedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot); overlay.setListener(dataStore); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index e30b324a46..18f5681386 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -93,6 +93,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled()) .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) + .useTellBasedProtocol(props.getUseTellBasedProtocol()) .build(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 0597aba5e2..ce4e543243 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -94,6 +94,7 @@ public class DistributedOperationalDataStoreProviderModule .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled()) .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) + .useTellBasedProtocol(props.getUseTellBasedProtocol()) .build(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 285de0ee01..2ae5f18785 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -66,157 +66,157 @@ module distributed-datastore-provider { default 1000; type non-zero-uint32-type; description "The maximum queue size for each shard's data store data change notification executor."; - } + } - leaf max-shard-data-change-executor-pool-size { + leaf max-shard-data-change-executor-pool-size { default 20; type non-zero-uint32-type; description "The maximum thread pool size for each shard's data store data change notification executor."; - } + } - leaf max-shard-data-change-listener-queue-size { + leaf max-shard-data-change-listener-queue-size { default 1000; type non-zero-uint32-type; description "The maximum queue size for each shard's data store data change listener."; - } + } - leaf max-shard-data-store-executor-queue-size { + leaf max-shard-data-store-executor-queue-size { default 5000; type non-zero-uint32-type; description "The maximum queue size for each shard's data store executor."; - } + } - leaf shard-transaction-idle-timeout-in-minutes { + leaf shard-transaction-idle-timeout-in-minutes { default 10; type non-zero-uint32-type; description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs."; - } + } - leaf shard-snapshot-batch-count { + leaf shard-snapshot-batch-count { default 20000; type non-zero-uint32-type; description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken."; - } + } - leaf shard-snapshot-data-threshold-percentage { + leaf shard-snapshot-data-threshold-percentage { default 12; type percentage; description "The percentage of Runtime.maxMemory() used by the in-memory journal log before a snapshot is to be taken"; - } + } - leaf shard-heartbeat-interval-in-millis { + leaf shard-heartbeat-interval-in-millis { default 500; type heartbeat-interval-type; description "The interval at which a shard will send a heart beat message to its remote shard."; - } + } - leaf shard-election-timeout-factor { + leaf shard-election-timeout-factor { default 20; type non-zero-uint32-type; description "The multiplication factor to be used to determine shard election timeout. The shard election timeout is determined by multiplying shard-heartbeat-interval-in-millis with the shard-election-timeout-factor"; - } + } - leaf operation-timeout-in-seconds { + leaf operation-timeout-in-seconds { default 5; type operation-timeout-type; description "The maximum amount of time for akka operations (remote or local) to complete before failing."; - } + } - leaf shard-journal-recovery-log-batch-size { + leaf shard-journal-recovery-log-batch-size { default 1; type non-zero-uint32-type; description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store."; - } + } - leaf shard-transaction-commit-timeout-in-seconds { + leaf shard-transaction-commit-timeout-in-seconds { default 30; type non-zero-uint32-type; description "The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction"; - } + } - leaf shard-transaction-commit-queue-capacity { + leaf shard-transaction-commit-queue-capacity { default 50000; type non-zero-uint32-type; description "The maximum allowed capacity for each shard's transaction commit queue."; - } - - leaf shard-commit-queue-expiry-timeout-in-seconds { - default 120; // 2 minutes - type non-zero-uint32-type; - description "The maximum amount of time a transaction can remain in a shard's commit queue waiting - to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be - quick but latencies can occur in between transaction ready and CanCommit or a remote broker - could lose connection and CanCommit might never occur. Expiring transactions from the queue - allows subsequent pending transaction to be processed."; - } - - leaf shard-initialization-timeout-in-seconds { + } + + leaf shard-commit-queue-expiry-timeout-in-seconds { + default 120; // 2 minutes + type non-zero-uint32-type; + description "The maximum amount of time a transaction can remain in a shard's commit queue waiting + to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be + quick but latencies can occur in between transaction ready and CanCommit or a remote broker + could lose connection and CanCommit might never occur. Expiring transactions from the queue + allows subsequent pending transaction to be processed."; + } + + leaf shard-initialization-timeout-in-seconds { default 300; // 5 minutes type non-zero-uint32-type; description "The maximum amount of time to wait for a shard to initialize from persistence on startup before failing an operation (eg transaction create and change listener registration)."; - } + } - leaf shard-leader-election-timeout-in-seconds { + leaf shard-leader-election-timeout-in-seconds { default 30; type non-zero-uint32-type; description "The maximum amount of time to wait for a shard to elect a leader before failing an operation (eg transaction create)."; - } + } - leaf shard-batched-modification-count { + leaf shard-batched-modification-count { default 1000; type non-zero-uint32-type; description "The number of transaction modification operations (put, merge, delete) to batch before sending to the shard transaction actor. Batching improves performance as less modifications messages are sent to the actor and thus lessens the chance that the transaction actor's mailbox queue could get full."; - } + } - leaf enable-metric-capture { + leaf enable-metric-capture { default false; type boolean; description "Enable or disable metric capture."; - } + } - leaf bounded-mailbox-capacity { - default 1000; - type non-zero-uint32-type; - description "Max queue size that an actor's mailbox can reach"; - } + leaf bounded-mailbox-capacity { + default 1000; + type non-zero-uint32-type; + description "Max queue size that an actor's mailbox can reach"; + } - leaf persistent { + leaf persistent { default true; type boolean; description "Enable or disable data persistence"; - } + } - leaf shard-isolated-leader-check-interval-in-millis { + leaf shard-isolated-leader-check-interval-in-millis { default 5000; type heartbeat-interval-type; description "The interval at which the leader of the shard will check if its majority followers are active and term itself as isolated"; - } + } - leaf transaction-creation-initial-rate-limit { + leaf transaction-creation-initial-rate-limit { default 100; type non-zero-uint32-type; description "The initial number of transactions per second that are allowed before the data store should begin applying back pressure. This number is only used as an initial guidance, subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit"; - } - - leaf transaction-debug-context-enabled { - default false; - type boolean; - description "Enable or disable transaction context debug. This will log the call site trace for - transactions that fail"; - } - - leaf custom-raft-policy-implementation { + } + + leaf transaction-debug-context-enabled { + default false; + type boolean; + description "Enable or disable transaction context debug. This will log the call site trace for + transactions that fail"; + } + + leaf custom-raft-policy-implementation { default ""; type string; description "A fully qualified java class name. The class should implement @@ -225,14 +225,21 @@ module distributed-datastore-provider { reflection. For now let's assume that these classes to customize raft behaviors should be present in the distributed data store module itself. If this property is set to a class which cannot be found then the default raft behavior will be applied"; - } + } - leaf shard-snapshot-chunk-size { + leaf shard-snapshot-chunk-size { default 2048000; type non-zero-uint32-type; - description "When sending a snapshot to a follower, this is the maximum size in bytes for + description "When sending a snapshot to a follower, this is the maximum size in bytes for a chunk of data."; - } + } + + leaf use-tell-based-protocol { + default false; + type boolean; + description "Use a newer protocol between the frontend and backend. This feature is considered + exprerimental at this point."; + } } // Augments the 'configuration' choice node under modules/module. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java index 21c8497db6..301f65f3a0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java @@ -86,7 +86,7 @@ public class DataTreeCohortIntegrationTest { ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", + try (final AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1")) { final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort); @@ -122,7 +122,7 @@ public class DataTreeCohortIntegrationTest { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final DistributedDataStore dataStore = + try (final AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1")) { dataStore.registerCommitCohort(TEST_ID, failedCohort); Thread.sleep(1000); // Registration is asynchronous @@ -157,7 +157,7 @@ public class DataTreeCohortIntegrationTest { Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort(); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final DistributedDataStore dataStore = + try (final AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1")) { dataStore.registerCommitCohort(TEST_ID, cohortToAbort); Thread.sleep(1000); // Registration is asynchronous diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 4d81442ba6..9b97104d97 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -115,7 +115,7 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", + try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, @@ -132,7 +132,7 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); @@ -183,7 +183,7 @@ public class DistributedDataStoreIntegrationTest { System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testReadWriteTransactionWithSingleShard", "test-1")) { // 1. Create a read-write Tx @@ -230,7 +230,7 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); @@ -288,7 +288,7 @@ public class DistributedDataStoreIntegrationTest { public void testSingleTransactionsWritesInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -331,7 +331,7 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { // Create the write Tx @@ -437,7 +437,7 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { // Create the read-write Tx @@ -520,7 +520,7 @@ public class DistributedDataStoreIntegrationTest { InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { // Create the write Tx @@ -598,7 +598,7 @@ public class DistributedDataStoreIntegrationTest { InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { // Create the read-write Tx @@ -675,7 +675,7 @@ public class DistributedDataStoreIntegrationTest { datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) .shardInitializationTimeout(200, TimeUnit.MILLISECONDS); - try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { Object result = dataStore.getActorContext().executeOperation( dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); @@ -753,7 +753,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionAbort() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", + try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1")) { DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); @@ -779,7 +779,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", + try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1")) { // 1. Create a Tx chain and write-only Tx @@ -875,7 +875,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -943,7 +943,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionsInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -992,7 +992,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1016,7 +1016,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1040,7 +1040,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionAfterClose() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterClose", "test-1")) { DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1060,7 +1060,7 @@ public class DistributedDataStoreIntegrationTest { public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1108,7 +1108,7 @@ public class DistributedDataStoreIntegrationTest { public void testChainedTransactionFailureWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testChainedTransactionFailureWithSingleShard", "cars-1")) { ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -1148,7 +1148,7 @@ public class DistributedDataStoreIntegrationTest { public void testChainedTransactionFailureWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore( + try (AbstractDataStore dataStore = setupDistributedDataStore( "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -1194,7 +1194,7 @@ public class DistributedDataStoreIntegrationTest { public void testChangeListenerRegistration() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", + try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, @@ -1274,7 +1274,7 @@ public class DistributedDataStoreIntegrationTest { new DatastoreSnapshot.ShardSnapshot("people", org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot)))); - try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", true, "cars", "people")) { DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e157e429e1..7474690bca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -124,8 +124,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private final TransactionIdentifier tx1 = nextTransactionId(); private final TransactionIdentifier tx2 = nextTransactionId(); - private DistributedDataStore followerDistributedDataStore; - private DistributedDataStore leaderDistributedDataStore; + private AbstractDataStore followerDistributedDataStore; + private AbstractDataStore leaderDistributedDataStore; private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; @@ -155,15 +155,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { JavaTestKit.shutdownActorSystem(follower2System); } - private void initDatastoresWithCars(String type) { + private void initDatastoresWithCars(final String type) { initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS); } - private void initDatastoresWithCarsAndPeople(String type) { + private void initDatastoresWithCarsAndPeople(final String type) { initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE); } - private void initDatastores(String type, String moduleShardsConfig, String[] shards) { + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards); @@ -175,7 +175,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); } - private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception { + private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) + throws Exception { Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -187,14 +188,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Car list node", listBuilder.build(), optional.get()); } - private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, - NormalizedNode expNode) throws Exception { + private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, + final NormalizedNode expNode) throws Exception { Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); assertEquals("Data node", expNode, optional.get()); } - private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception { + private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) + throws Exception { Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); assertEquals("exists", true, exists); } @@ -250,7 +252,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2")); - try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder) + try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder) .setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) { verifyCars(member2Datastore.newReadOnlyTransaction(), car2); } @@ -538,7 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); - try (DistributedDataStore ds = + try (AbstractDataStore ds = newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); @@ -805,7 +807,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); - try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { // Create and submit a couple tx's so they're pending. @@ -980,7 +982,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); - try (DistributedDataStore ds = + try (AbstractDataStore ds = follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) { followerTestKit.waitForMembersUp("member-1", "member-3"); @@ -1008,7 +1010,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } - private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { + private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); Answer answer = invocation -> newBuilder.build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index f09441e07e..1d501e1cdb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -49,7 +49,7 @@ public class IntegrationTestKit extends ShardTestKit { protected DatastoreContext.Builder datastoreContextBuilder; protected DatastoreSnapshot restoreFromSnapshot; - public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) { + public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) { super(actorSystem); this.datastoreContextBuilder = datastoreContextBuilder; } @@ -58,23 +58,23 @@ public class IntegrationTestKit extends ShardTestKit { return datastoreContextBuilder; } - public DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { + public AbstractDataStore setupDistributedDataStore(final String typeName, final String... shardNames) { return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames); } - public DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, - String... shardNames) { + public AbstractDataStore setupDistributedDataStore(final String typeName, final boolean waitUntilLeader, + final String... shardNames) { return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader, SchemaContextHelper.full(), shardNames); } - public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, + public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, final boolean waitUntilLeader, final String... shardNames) { return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader, SchemaContextHelper.full(), shardNames); } - public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, + public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) { final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem()); final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf"); @@ -86,7 +86,7 @@ public class IntegrationTestKit extends ShardTestKit { Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory, + AbstractDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory, restoreFromSnapshot); dataStore.onGlobalContextUpdated(schemaContext); @@ -99,7 +99,7 @@ public class IntegrationTestKit extends ShardTestKit { return dataStore; } - public void waitUntilLeader(ActorContext actorContext, String... shardNames) { + public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) { for (String shardName: shardNames) { ActorRef shard = findLocalShard(actorContext, shardName); @@ -109,7 +109,7 @@ public class IntegrationTestKit extends ShardTestKit { } } - public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) { + public void waitUntilNoLeader(final ActorContext actorContext, final String... shardNames) { for (String shardName: shardNames) { ActorRef shard = findLocalShard(actorContext, shardName); assertNotNull("No local shard found for " + shardName, shard); @@ -118,7 +118,7 @@ public class IntegrationTestKit extends ShardTestKit { } } - public void waitForMembersUp(String... otherMembers) { + public void waitForMembersUp(final String... otherMembers) { Set otherMembersSet = Sets.newHashSet(otherMembers); Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { @@ -136,7 +136,7 @@ public class IntegrationTestKit extends ShardTestKit { fail("Member(s) " + otherMembersSet + " are not Up"); } - public static ActorRef findLocalShard(ActorContext actorContext, String shardName) { + public static ActorRef findLocalShard(final ActorContext actorContext, final String shardName) { ActorRef shard = null; for (int i = 0; i < 20 * 5 && shard == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -148,8 +148,8 @@ public class IntegrationTestKit extends ShardTestKit { return shard; } - public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier) - throws Exception { + public static void verifyShardStats(final AbstractDataStore datastore, final String shardName, + final ShardStatsVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future future = actorContext.findLocalShardAsync(shardName); @@ -173,8 +173,8 @@ public class IntegrationTestKit extends ShardTestKit { throw lastError; } - void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, - NormalizedNode nodeToWrite) throws Exception { + void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath, + final NormalizedNode nodeToWrite) throws Exception { // 1. Create a write-only Tx @@ -218,7 +218,7 @@ public class IntegrationTestKit extends ShardTestKit { } @SuppressWarnings("checkstyle:IllegalCatch") - void assertExceptionOnCall(Callable callable, Class expType) + void assertExceptionOnCall(final Callable callable, final Class expType) throws Exception { try { callable.call(); @@ -229,13 +229,10 @@ public class IntegrationTestKit extends ShardTestKit { } void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, - Class expType) throws Exception { - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newWriteOnlyTransaction(); - return null; - } + final Class expType) throws Exception { + assertExceptionOnCall(() -> { + txChain.newWriteOnlyTransaction(); + return null; }, expType); assertExceptionOnCall(() -> { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index db6c5e664d..bb2ce19dee 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -50,8 +50,8 @@ public class MemberNode { static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; + private AbstractDataStore configDataStore; + private AbstractDataStore operDataStore; private DatastoreContext.Builder datastoreContextBuilder; private boolean cleanedUp; @@ -62,7 +62,7 @@ public class MemberNode { * callers to cleanup instances on test completion. * @return a Builder instance */ - public static Builder builder(List members) { + public static Builder builder(final List members) { return new Builder(members); } @@ -71,12 +71,12 @@ public class MemberNode { } - public DistributedDataStore configDataStore() { + public AbstractDataStore configDataStore() { return configDataStore; } - public DistributedDataStore operDataStore() { + public AbstractDataStore operDataStore() { return operDataStore; } @@ -84,11 +84,11 @@ public class MemberNode { return datastoreContextBuilder; } - public void waitForMembersUp(String... otherMembers) { + public void waitForMembersUp(final String... otherMembers) { kit.waitForMembersUp(otherMembers); } - public void waitForMemberDown(String member) { + public void waitForMemberDown(final String member) { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(kit.getSystem()).state(); @@ -124,8 +124,8 @@ public class MemberNode { } } - public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { + public static void verifyRaftState(final AbstractDataStore datastore, final String shardName, + final RaftStateVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future future = actorContext.findLocalShardAsync(shardName); @@ -149,8 +149,8 @@ public class MemberNode { throw lastError; } - public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, - String... peerMemberNames) throws Exception { + public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName, + final String... peerMemberNames) throws Exception { final Set peerIds = Sets.newHashSet(); for (String p: peerMemberNames) { peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p), @@ -161,7 +161,7 @@ public class MemberNode { raftState.getPeerAddresses().keySet())); } - public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) { + public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { Optional shardReply = datastore.getActorContext().findLocalShard(shardName); @@ -186,7 +186,7 @@ public class MemberNode { private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - Builder(List members) { + Builder(final List members) { this.members = members; } @@ -195,7 +195,7 @@ public class MemberNode { * * @return this Builder */ - public Builder moduleShardsConfig(String newModuleShardsConfig) { + public Builder moduleShardsConfig(final String newModuleShardsConfig) { this.moduleShardsConfig = newModuleShardsConfig; return this; } @@ -205,7 +205,7 @@ public class MemberNode { * * @return this Builder */ - public Builder akkaConfig(String newAkkaConfig) { + public Builder akkaConfig(final String newAkkaConfig) { this.akkaConfig = newAkkaConfig; return this; } @@ -215,7 +215,7 @@ public class MemberNode { * * @return this Builder */ - public Builder testName(String newTestName) { + public Builder testName(final String newTestName) { this.testName = newTestName; return this; } @@ -225,7 +225,7 @@ public class MemberNode { * * @return this Builder */ - public Builder waitForShardLeader(String... shardNames) { + public Builder waitForShardLeader(final String... shardNames) { this.waitForshardLeader = shardNames; return this; } @@ -235,7 +235,7 @@ public class MemberNode { * * @return this Builder */ - public Builder createOperDatastore(boolean value) { + public Builder createOperDatastore(final boolean value) { this.createOperDatastore = value; return this; } @@ -245,7 +245,7 @@ public class MemberNode { * * @return this Builder */ - public Builder schemaContext(SchemaContext newSchemaContext) { + public Builder schemaContext(final SchemaContext newSchemaContext) { this.schemaContext = newSchemaContext; return this; } @@ -255,7 +255,7 @@ public class MemberNode { * * @return this Builder */ - public Builder datastoreContextBuilder(DatastoreContext.Builder builder) { + public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) { datastoreContextBuilder = builder; return this; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java index 0bc811da0e..6ff4ad25ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java @@ -44,8 +44,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; @@ -122,7 +122,7 @@ public class DistributedEntityOwnershipIntegrationTest { memberNodes.clear(); } - private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) { + private static DistributedEntityOwnershipService newOwnershipService(final AbstractDataStore datastore) { return DistributedEntityOwnershipService.start(datastore.getActorContext(), EntityOwnerSelectionStrategyConfig.newBuilder().build()); } @@ -142,7 +142,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -283,7 +283,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -368,7 +368,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT) .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -457,7 +457,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -538,7 +538,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(leaderDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore); leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); @@ -547,7 +547,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore(); + AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore(); follower1DistributedDataStore.waitTillReady(); leaderNode.waitForMembersUp("member-2"); @@ -616,7 +616,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -659,7 +659,7 @@ public class DistributedEntityOwnershipIntegrationTest { .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); - DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); follower1Node.configDataStore().waitTillReady(); @@ -694,7 +694,7 @@ public class DistributedEntityOwnershipIntegrationTest { assertEquals("EntityOwnershipState", expState, state.get()); } - private static void verifyCandidates(final DistributedDataStore dataStore, final DOMEntity entity, + private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity, final String... expCandidates) throws Exception { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); @@ -721,7 +721,7 @@ public class DistributedEntityOwnershipIntegrationTest { } @SuppressWarnings("checkstyle:IllegalCatch") - private static void verifyOwner(final DistributedDataStore dataStore, final DOMEntity entity, + private static void verifyOwner(final AbstractDataStore dataStore, final DOMEntity entity, final String expOwner) { AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getIdentifier(), path -> { try { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index 711706a5c4..aac4b23d3a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; @@ -80,7 +81,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh static int ID_COUNTER = 1; private final String dataStoreName = "config" + ID_COUNTER++; - private DistributedDataStore dataStore; + private AbstractDataStore dataStore; @Before public void setUp() { -- 2.36.6