import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.MemberNode;
import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.Shard;
verifyFailedRpcResult(rpcResult);
}
- private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
- DistributedDataStore readFromStore) throws Exception {
+ private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
+ AbstractDataStore readFromStore) throws Exception {
DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
NormalizedNode<?, ?> carsNode = CarsModel.create();
writeTx.write(CarsModel.BASE_PATH, carsNode);
return carsNode;
}
- private static void readCarsNodeAndVerify(DistributedDataStore readFromStore,
+ private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
NormalizedNode<?, ?> expCarsNode) throws Exception {
Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
.read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
successShardResult("cars", DataStoreType.Operational),
successShardResult("people", DataStoreType.Operational));
- verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
replicaNode2.configDataStore(), replicaNode2.operDataStore(),
replicaNode3.configDataStore(), replicaNode3.operDataStore()},
new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true),
successShardResult("cars", DataStoreType.Operational),
successShardResult("people", DataStoreType.Operational));
- verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
replicaNode2.configDataStore(), replicaNode2.operDataStore(),
replicaNode3.configDataStore(), replicaNode3.operDataStore()},
new String[]{"cars", "people"},
successShardResult("cars", DataStoreType.Operational),
successShardResult("people", DataStoreType.Operational));
- verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
replicaNode2.configDataStore(), replicaNode2.operDataStore(),
replicaNode3.configDataStore(), replicaNode3.operDataStore()},
new String[]{"cars", "people"},
successShardResult("cars", DataStoreType.Operational),
successShardResult("people", DataStoreType.Operational));
- verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+ verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
replicaNode2.configDataStore(), replicaNode2.operDataStore(),
replicaNode3.configDataStore(), replicaNode3.operDataStore()},
new String[]{"cars", "people"},
successShardResult("people", DataStoreType.Operational));
// Members 2 and 3 are now non-voting but should get replicated with the new new server config.
- verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
replicaNode2.configDataStore(), replicaNode2.operDataStore(),
replicaNode3.configDataStore(), replicaNode3.operDataStore()},
new String[]{"cars", "people"},
});
}
- private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+ private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
String member, String datastoreTypeSuffix, String... shards) {
String[] datastoreTypes = {"config_", "oper_"};
for (String type : datastoreTypes) {
}
@SafeVarargs
- private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+ private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
SimpleEntry<String, Boolean>... expStates) throws Exception {
- for (DistributedDataStore datastore: datastores) {
+ for (AbstractDataStore datastore: datastores) {
for (String shard: shards) {
verifyVotingStates(datastore, shard, expStates);
}
}
@SafeVarargs
- private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+ private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
SimpleEntry<String, Boolean>... expStates) throws Exception {
String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
Map<String, Boolean> expStateMap = new HashMap<>();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import akka.actor.ActorSystem;
+import com.google.common.annotations.VisibleForTesting;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Implements a distributed DOMStore using ClientActor.
+ */
+public class ClientBackedDataStore extends AbstractDataStore {
+
+ public ClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
+ super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+ }
+
+ @VisibleForTesting
+ ClientBackedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
+ super(actorContext, identifier);
+ }
+
+ @Override
+ public DOMStoreTransactionChain createTransactionChain() {
+ return new ClientBackedTransactionChain(getClient().createLocalHistory());
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new ClientBackedReadTransaction(getClient().createSnapshot(), null);
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new ClientBackedWriteTransaction(getClient().createTransaction());
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new ClientBackedReadWriteTransaction(getClient().createTransaction());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreReadTransaction} backed by a {@link ClientSnapshot}. Used for standalone
+ * transactions.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedReadTransaction extends ClientBackedTransaction<ClientSnapshot>
+ implements DOMStoreReadTransaction {
+ private static final AtomicReferenceFieldUpdater<ClientBackedReadTransaction, ClientBackedTransactionChain>
+ PARENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ClientBackedReadTransaction.class,
+ ClientBackedTransactionChain.class, "parent");
+
+ @SuppressWarnings("unused")
+ private volatile ClientBackedTransactionChain parent;
+
+ ClientBackedReadTransaction(final ClientSnapshot delegate, @Nullable final ClientBackedTransactionChain parent) {
+ super(delegate);
+ this.parent = parent;
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ final ClientBackedTransactionChain local = PARENT_UPDATER.getAndSet(this, null);
+ if (local != null) {
+ local.snapshotClosed(delegate());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreReadWriteTransaction} backed by a {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedReadWriteTransaction extends ClientBackedWriteTransaction
+ implements DOMStoreReadWriteTransaction {
+
+ ClientBackedReadWriteTransaction(final ClientTransaction delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link DOMStoreTransaction} backed by a {@link ClientTransaction}. It guards against user-level
+ * leaks by maintaining a phantom reference on the backing transaction, which will ensure that the transaction will
+ * be aborted, if it is not otherwise closed, just before this object is garbage-collected.
+ *
+ * @author Robert Varga
+ */
+abstract class ClientBackedTransaction<T extends AbstractClientHandle<?>> extends
+ AbstractDOMStoreTransaction<TransactionIdentifier> {
+ private static final class Finalizer extends FinalizablePhantomReference<ClientBackedTransaction<?>> {
+ private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue();
+ private static final Set<Finalizer> FINALIZERS = ConcurrentHashMap.newKeySet();
+ private static final Logger LOG = LoggerFactory.getLogger(Finalizer.class);
+
+ private final AbstractClientHandle<?> transaction;
+
+ private Finalizer(final ClientBackedTransaction<?> referent, final AbstractClientHandle<?> transaction) {
+ super(referent, QUEUE);
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ static @Nonnull <T extends AbstractClientHandle<?>> T recordTransaction(
+ @Nonnull final ClientBackedTransaction<T> referent, @Nonnull final T transaction) {
+ FINALIZERS.add(new Finalizer(referent, transaction));
+ return transaction;
+ }
+
+ @Override
+ public void finalizeReferent() {
+ FINALIZERS.remove(this);
+ if (transaction.abort()) {
+ LOG.warn("Aborted orphan transaction {}", transaction.getIdentifier());
+ }
+ }
+ }
+
+ private final T delegate;
+
+ ClientBackedTransaction(final T delegate) {
+ super(delegate.getIdentifier());
+ this.delegate = Finalizer.recordTransaction(this, delegate);
+ }
+
+ final T delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void close() {
+ delegate.abort();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.WeakHashMap;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedTransactionChain implements DOMStoreTransactionChain {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientBackedTransactionChain.class);
+
+ @GuardedBy("this")
+ private final Map<AbstractClientHandle<?>, Boolean> openSnapshots = new WeakHashMap<>();
+
+ private final ClientLocalHistory history;
+
+ ClientBackedTransactionChain(final ClientLocalHistory history) {
+ this.history = Preconditions.checkNotNull(history);
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new ClientBackedReadTransaction(createSnapshot(), this);
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new ClientBackedReadWriteTransaction(createTransaction());
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new ClientBackedWriteTransaction(createTransaction());
+ }
+
+ @Override
+ public synchronized void close() {
+ for (AbstractClientHandle<?> snap : openSnapshots.keySet()) {
+ LOG.warn("Aborting unclosed transaction {}", snap.getIdentifier());
+ snap.abort();
+ }
+ openSnapshots.clear();
+
+ history.close();
+ }
+
+ synchronized void snapshotClosed(final ClientSnapshot clientTransaction) {
+ openSnapshots.remove(clientTransaction);
+ }
+
+ private ClientSnapshot createSnapshot() {
+ try {
+ return recordSnapshot(history.takeSnapshot());
+ } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) {
+ throw new TransactionChainClosedException("Transaction chain has been closed", e);
+ }
+ }
+
+ private ClientTransaction createTransaction() {
+ try {
+ return recordSnapshot(history.createTransaction());
+ } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) {
+ throw new TransactionChainClosedException("Transaction chain has been closed", e);
+ }
+ }
+
+ private synchronized <T extends AbstractClientHandle<?>> T recordSnapshot(final T snapshot) {
+ openSnapshots.put(snapshot, Boolean.TRUE);
+ return snapshot;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreWriteTransaction} backed by a {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+class ClientBackedWriteTransaction extends ClientBackedTransaction<ClientTransaction>
+ implements DOMStoreWriteTransaction {
+ ClientBackedWriteTransaction(final ClientTransaction delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegate().write(path, data);
+ }
+
+ @Override
+ public final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegate().merge(path, data);
+ }
+
+ @Override
+ public final void delete(final YangInstanceIdentifier path) {
+ delegate().delete(path);
+ }
+
+ @Override
+ public final DOMStoreThreePhaseCommitCohort ready() {
+ return new DOMStoreThreePhaseCommitCohortAdaptor(delegate().ready());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * Utility class from bridging {@link DOMStoreThreePhaseCommitCohort} and
+ * {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}.
+ *
+ * @author Robert Varga
+ */
+final class DOMStoreThreePhaseCommitCohortAdaptor extends ForwardingObject implements DOMStoreThreePhaseCommitCohort {
+ private final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate;
+
+ DOMStoreThreePhaseCommitCohortAdaptor(
+ final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate.canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return delegate.commit();
+ }
+
+ @Override
+ protected Object delegate() {
+ return delegate;
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}. This class is not final to allow further subclassing by
- * {@link ShardedDOMStoreReadWriteTransaction}.
- *
- * @author Robert Varga
- */
-class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction {
- ShardedDOMStoreReadTransaction(final ClientTransaction tx) {
- super(tx);
- }
-
- @Override
- public final void close() {
- transaction().abort();
- }
-
- @Override
- public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
- final YangInstanceIdentifier path) {
- return transaction().read(path);
- }
-
- @Override
- public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
- return transaction().exists(path);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction
- implements DOMStoreReadWriteTransaction {
- ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) {
- super(tx);
- }
-
- @Override
- public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- transaction().write(path, data);
- }
-
- @Override
- public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- transaction().merge(path, data);
- }
-
- @Override
- public void delete(final YangInstanceIdentifier path) {
- transaction().delete(path);
- }
-
- @Override
- public DOMStoreThreePhaseCommitCohort ready() {
- return transaction().ready();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps
- * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain {
- private final ClientLocalHistory history;
-
- ShardedDOMStoreTransactionChain(final ClientLocalHistory history) {
- this.history = Preconditions.checkNotNull(history);
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new ShardedDOMStoreReadTransaction(history.createTransaction());
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new ShardedDOMStoreReadWriteTransaction(history.createTransaction());
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new ShardedDOMStoreWriteTransaction(history.createTransaction());
- }
-
- @Override
- public void close() {
- history.close();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction {
- ShardedDOMStoreWriteTransaction(final ClientTransaction tx) {
- super(tx);
- }
-
- @Override
- public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- transaction().write(path, data);
- }
-
- @Override
- public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- transaction().merge(path, data);
- }
-
- @Override
- public void delete(final YangInstanceIdentifier path) {
- transaction().delete(path);
- }
-
- @Override
- public DOMStoreThreePhaseCommitCohort ready() {
- return transaction().ready();
- }
-
- @Override
- public void close() {
- transaction().abort();
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation of a distributed DOMStore.
+ */
+public abstract class AbstractDataStore implements DistributedDataStoreInterface, SchemaContextListener,
+ DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
+ DOMDataTreeCommitCohortRegistry, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
+
+ private static final long READY_WAIT_FACTOR = 3;
+
+ private final ActorContext actorContext;
+ private final long waitTillReadyTimeInMillis;
+
+ private AutoCloseable closeable;
+
+ private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
+
+ private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
+
+ private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+
+ private final ClientIdentifier identifier;
+ private final DataStoreClient client;
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
+ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+
+ String shardManagerId = ShardManagerIdentifier.builder()
+ .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
+
+ LOG.info("Creating ShardManager : {}", shardManagerId);
+
+ String shardDispatcher =
+ new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
+ PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
+ ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+ .datastoreContextFactory(datastoreContextFactory)
+ .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
+ .primaryShardInfoCache(primaryShardInfoCache)
+ .restoreFromSnapshot(restoreFromSnapshot);
+
+ actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
+ primaryShardInfoCache);
+
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);
+
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;
+
+ datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
+ datastoreConfigMXBean.registerMBean();
+
+ datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
+ .getDataStoreMXBeanType(), actorContext);
+ datastoreInfoMXBean.registerMBean();
+ }
+
+ @VisibleForTesting
+ protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.client = null;
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;
+ }
+
+ protected final DataStoreClient getClient() {
+ return client;
+ }
+
+ final ClientIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public void setCloseable(final AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ ListenerRegistration<L> registerChangeListener(
+ final YangInstanceIdentifier path, final L listener,
+ final AsyncDataBroker.DataChangeScope scope) {
+
+ Preconditions.checkNotNull(path, "path should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+
+ String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
+
+ final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
+ new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+ listenerRegistrationProxy.init(path, scope);
+
+ return listenerRegistrationProxy;
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
+ Preconditions.checkNotNull(treeId, "treeId should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+ final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+ LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+ final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+ new DataTreeChangeListenerProxy<>(actorContext, listener);
+ listenerRegistrationProxy.init(shardName, treeId);
+
+ return listenerRegistrationProxy;
+ }
+
+
+ @Override
+ public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+ final DOMDataTreeIdentifier subtree, final C cohort) {
+ YangInstanceIdentifier treeId =
+ Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+ Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+ final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+ LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+ DataTreeCohortRegistrationProxy<C> cohortProxy =
+ new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
+ cohortProxy.init(shardName);
+ return cohortProxy;
+ }
+
+ @Override
+ public void onGlobalContextUpdated(final SchemaContext schemaContext) {
+ actorContext.setSchemaContext(schemaContext);
+ }
+
+ @Override
+ public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
+ LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
+
+ actorContext.setDatastoreContext(contextFactory);
+ datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void close() {
+ LOG.info("Closing data store {}", identifier);
+
+ if (datastoreConfigMXBean != null) {
+ datastoreConfigMXBean.unregisterMBean();
+ }
+ if (datastoreInfoMXBean != null) {
+ datastoreInfoMXBean.unregisterMBean();
+ }
+
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+
+ actorContext.shutdown();
+
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public ActorContext getActorContext() {
+ return actorContext;
+ }
+
+ public void waitTillReady() {
+ LOG.info("Beginning to wait for data store to become ready : {}", identifier);
+
+ try {
+ if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
+ LOG.debug("Data store {} is now ready", identifier);
+ } else {
+ LOG.error("Shard leaders failed to settle in {} seconds, giving up",
+ TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for shards to settle", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+ final String shardDispatcher, final String shardManagerId) {
+ Exception lastException = null;
+
+ for (int i = 0; i < 100; i++) {
+ try {
+ return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
+ ActorContext.BOUNDED_MAILBOX), shardManagerId);
+ } catch (Exception e) {
+ lastException = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
+ + "(retry count = {})", shardManagerId, e.getMessage(), i);
+ }
+ }
+
+ throw new IllegalStateException("Failed to create Shard Manager", lastException);
+ }
+
+ @VisibleForTesting
+ public CountDownLatch getWaitTillReadyCountDownLatch() {
+ return waitTillReadyCountDownLatch;
+ }
+}
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
+ private boolean useTellBasedProtocol = false;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
+ this.useTellBasedProtocol = other.useTellBasedProtocol;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return transactionDebugContextEnabled;
}
+ public boolean isUseTellBasedProtocol() {
+ return useTellBasedProtocol;
+ }
+
public int getShardSnapshotChunkSize() {
return raftConfig.getSnapshotChunkSize();
}
return this;
}
+ public Builder useTellBasedProtocol(boolean value) {
+ datastoreContext.useTellBasedProtocol = value;
+ return this;
+ }
+
/**
* For unit tests only.
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Implements a distributed DOMStore.
+ * Implements a distributed DOMStore using Akka Patterns.ask().
*/
-public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
- DOMDataTreeCommitCohortRegistry, AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-
- private static final long READY_WAIT_FACTOR = 3;
-
- private final ActorContext actorContext;
- private final long waitTillReadyTimeInMillis;
-
- private AutoCloseable closeable;
-
- private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
-
- private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
-
- private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
-
- private final ClientIdentifier identifier;
- private final DataStoreClient client;
+public class DistributedDataStore extends AbstractDataStore {
private final TransactionContextFactory txContextFactory;
- @SuppressWarnings("checkstyle:IllegalCatch")
public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
final DatastoreSnapshot restoreFromSnapshot) {
- Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
-
- String shardManagerId = ShardManagerIdentifier.builder()
- .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
-
- LOG.info("Creating ShardManager : {}", shardManagerId);
-
- String shardDispatcher =
- new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-
- PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
-
- ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
- .datastoreContextFactory(datastoreContextFactory)
- .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
- .primaryShardInfoCache(primaryShardInfoCache)
- .restoreFromSnapshot(restoreFromSnapshot);
-
- actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
- shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
- primaryShardInfoCache);
-
- final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
- final ActorRef clientActor = actorSystem.actorOf(clientProps);
- try {
- client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to get actor for {}", clientProps, e);
- clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- throw Throwables.propagate(e);
- }
-
- identifier = client.getIdentifier();
- LOG.debug("Distributed data store client {} started", identifier);
-
- this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
-
- this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
-
- datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
- datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
- datastoreConfigMXBean.registerMBean();
-
- datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
- .getDataStoreMXBeanType(), actorContext);
- datastoreInfoMXBean.registerMBean();
+ super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+ this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier());
}
@VisibleForTesting
DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
- this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
- this.client = null;
- this.identifier = Preconditions.checkNotNull(identifier);
- this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
- this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
- }
-
- public void setCloseable(final AutoCloseable closeable) {
- this.closeable = closeable;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- ListenerRegistration<L> registerChangeListener(
- final YangInstanceIdentifier path, final L listener,
- final AsyncDataBroker.DataChangeScope scope) {
-
- Preconditions.checkNotNull(path, "path should not be null");
- Preconditions.checkNotNull(listener, "listener should not be null");
-
- LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
- String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
-
- final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
- new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
- listenerRegistrationProxy.init(path, scope);
-
- return listenerRegistrationProxy;
- }
-
- @Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
- final YangInstanceIdentifier treeId, final L listener) {
- Preconditions.checkNotNull(treeId, "treeId should not be null");
- Preconditions.checkNotNull(listener, "listener should not be null");
-
- final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
- LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
-
- final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
- new DataTreeChangeListenerProxy<>(actorContext, listener);
- listenerRegistrationProxy.init(shardName, treeId);
-
- return listenerRegistrationProxy;
+ super(actorContext, identifier);
+ this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier());
}
- @Override
- public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
- final DOMDataTreeIdentifier subtree, final C cohort) {
- YangInstanceIdentifier treeId =
- Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
- Preconditions.checkNotNull(cohort, "listener should not be null");
-
-
- final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
- LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
-
- DataTreeCohortRegistrationProxy<C> cohortProxy =
- new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
- cohortProxy.init(shardName);
- return cohortProxy;
- }
-
@Override
public DOMStoreTransactionChain createTransactionChain() {
return txContextFactory.createTransactionChain();
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- actorContext.acquireTxCreationPermit();
+ getActorContext().acquireTxCreationPermit();
return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- actorContext.acquireTxCreationPermit();
+ getActorContext().acquireTxCreationPermit();
return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
}
@Override
- public void onGlobalContextUpdated(final SchemaContext schemaContext) {
- actorContext.setSchemaContext(schemaContext);
- }
-
- @Override
- public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
- LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
-
- actorContext.setDatastoreContext(contextFactory);
- datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
- }
-
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
- LOG.info("Closing data store {}", identifier);
-
- if (datastoreConfigMXBean != null) {
- datastoreConfigMXBean.unregisterMBean();
- }
- if (datastoreInfoMXBean != null) {
- datastoreInfoMXBean.unregisterMBean();
- }
-
- if (closeable != null) {
- try {
- closeable.close();
- } catch (Exception e) {
- LOG.debug("Error closing instance", e);
- }
- }
-
txContextFactory.close();
- actorContext.shutdown();
-
- if (client != null) {
- client.close();
- }
- }
-
- @Override
- public ActorContext getActorContext() {
- return actorContext;
- }
-
- public void waitTillReady() {
- LOG.info("Beginning to wait for data store to become ready : {}", identifier);
-
- try {
- if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
- LOG.debug("Data store {} is now ready", identifier);
- } else {
- LOG.error("Shard leaders failed to settle in {} seconds, giving up",
- TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for shards to settle", e);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
- final String shardDispatcher, final String shardManagerId) {
- Exception lastException = null;
-
- for (int i = 0; i < 100; i++) {
- try {
- return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
- ActorContext.BOUNDED_MAILBOX), shardManagerId);
- } catch (Exception e) {
- lastException = e;
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
- + "(retry count = {})", shardManagerId, e.getMessage(), i);
- }
- }
-
- throw new IllegalStateException("Failed to create Shard Manager", lastException);
- }
-
- @VisibleForTesting
- public CountDownLatch getWaitTillReadyCountDownLatch() {
- return waitTillReadyCountDownLatch;
+ super.close();
}
}
import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
public class DistributedDataStoreFactory {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
- public static DistributedDataStore createInstance(SchemaService schemaService,
- DatastoreContext datastoreContext, DatastoreSnapshotRestore datastoreSnapshotRestore,
- ActorSystemProvider actorSystemProvider, BundleContext bundleContext) {
+ public static AbstractDataStore createInstance(final SchemaService schemaService,
+ final DatastoreContext datastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore,
+ final ActorSystemProvider actorSystemProvider, final BundleContext bundleContext) {
LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreName());
introspector, bundleContext);
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
- final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
- new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot);
+ ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
+ DatastoreContextFactory contextFactory = introspector.newContextFactory();
+
+ final AbstractDataStore dataStore = datastoreContext.isUseTellBasedProtocol()
+ ? new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot) :
+ new DistributedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot);
overlay.setListener(dataStore);
.transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
.shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+ .useTellBasedProtocol(props.getUseTellBasedProtocol())
.build();
}
.transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
.shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+ .useTellBasedProtocol(props.getUseTellBasedProtocol())
.build();
}
default 1000;
type non-zero-uint32-type;
description "The maximum queue size for each shard's data store data change notification executor.";
- }
+ }
- leaf max-shard-data-change-executor-pool-size {
+ leaf max-shard-data-change-executor-pool-size {
default 20;
type non-zero-uint32-type;
description "The maximum thread pool size for each shard's data store data change notification executor.";
- }
+ }
- leaf max-shard-data-change-listener-queue-size {
+ leaf max-shard-data-change-listener-queue-size {
default 1000;
type non-zero-uint32-type;
description "The maximum queue size for each shard's data store data change listener.";
- }
+ }
- leaf max-shard-data-store-executor-queue-size {
+ leaf max-shard-data-store-executor-queue-size {
default 5000;
type non-zero-uint32-type;
description "The maximum queue size for each shard's data store executor.";
- }
+ }
- leaf shard-transaction-idle-timeout-in-minutes {
+ leaf shard-transaction-idle-timeout-in-minutes {
default 10;
type non-zero-uint32-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
- }
+ }
- leaf shard-snapshot-batch-count {
+ leaf shard-snapshot-batch-count {
default 20000;
type non-zero-uint32-type;
description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
- }
+ }
- leaf shard-snapshot-data-threshold-percentage {
+ leaf shard-snapshot-data-threshold-percentage {
default 12;
type percentage;
description "The percentage of Runtime.maxMemory() used by the in-memory journal log before a snapshot is to be taken";
- }
+ }
- leaf shard-heartbeat-interval-in-millis {
+ leaf shard-heartbeat-interval-in-millis {
default 500;
type heartbeat-interval-type;
description "The interval at which a shard will send a heart beat message to its remote shard.";
- }
+ }
- leaf shard-election-timeout-factor {
+ leaf shard-election-timeout-factor {
default 20;
type non-zero-uint32-type;
description "The multiplication factor to be used to determine shard election timeout. The shard election timeout
is determined by multiplying shard-heartbeat-interval-in-millis with the shard-election-timeout-factor";
- }
+ }
- leaf operation-timeout-in-seconds {
+ leaf operation-timeout-in-seconds {
default 5;
type operation-timeout-type;
description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
- }
+ }
- leaf shard-journal-recovery-log-batch-size {
+ leaf shard-journal-recovery-log-batch-size {
default 1;
type non-zero-uint32-type;
description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
- }
+ }
- leaf shard-transaction-commit-timeout-in-seconds {
+ leaf shard-transaction-commit-timeout-in-seconds {
default 30;
type non-zero-uint32-type;
description "The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction";
- }
+ }
- leaf shard-transaction-commit-queue-capacity {
+ leaf shard-transaction-commit-queue-capacity {
default 50000;
type non-zero-uint32-type;
description "The maximum allowed capacity for each shard's transaction commit queue.";
- }
-
- leaf shard-commit-queue-expiry-timeout-in-seconds {
- default 120; // 2 minutes
- type non-zero-uint32-type;
- description "The maximum amount of time a transaction can remain in a shard's commit queue waiting
- to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
- quick but latencies can occur in between transaction ready and CanCommit or a remote broker
- could lose connection and CanCommit might never occur. Expiring transactions from the queue
- allows subsequent pending transaction to be processed.";
- }
-
- leaf shard-initialization-timeout-in-seconds {
+ }
+
+ leaf shard-commit-queue-expiry-timeout-in-seconds {
+ default 120; // 2 minutes
+ type non-zero-uint32-type;
+ description "The maximum amount of time a transaction can remain in a shard's commit queue waiting
+ to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
+ quick but latencies can occur in between transaction ready and CanCommit or a remote broker
+ could lose connection and CanCommit might never occur. Expiring transactions from the queue
+ allows subsequent pending transaction to be processed.";
+ }
+
+ leaf shard-initialization-timeout-in-seconds {
default 300; // 5 minutes
type non-zero-uint32-type;
description "The maximum amount of time to wait for a shard to initialize from persistence
on startup before failing an operation (eg transaction create and change
listener registration).";
- }
+ }
- leaf shard-leader-election-timeout-in-seconds {
+ leaf shard-leader-election-timeout-in-seconds {
default 30;
type non-zero-uint32-type;
description "The maximum amount of time to wait for a shard to elect a leader before failing
an operation (eg transaction create).";
- }
+ }
- leaf shard-batched-modification-count {
+ leaf shard-batched-modification-count {
default 1000;
type non-zero-uint32-type;
description "The number of transaction modification operations (put, merge, delete) to
batch before sending to the shard transaction actor. Batching improves
performance as less modifications messages are sent to the actor and thus
lessens the chance that the transaction actor's mailbox queue could get full.";
- }
+ }
- leaf enable-metric-capture {
+ leaf enable-metric-capture {
default false;
type boolean;
description "Enable or disable metric capture.";
- }
+ }
- leaf bounded-mailbox-capacity {
- default 1000;
- type non-zero-uint32-type;
- description "Max queue size that an actor's mailbox can reach";
- }
+ leaf bounded-mailbox-capacity {
+ default 1000;
+ type non-zero-uint32-type;
+ description "Max queue size that an actor's mailbox can reach";
+ }
- leaf persistent {
+ leaf persistent {
default true;
type boolean;
description "Enable or disable data persistence";
- }
+ }
- leaf shard-isolated-leader-check-interval-in-millis {
+ leaf shard-isolated-leader-check-interval-in-millis {
default 5000;
type heartbeat-interval-type;
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
- }
+ }
- leaf transaction-creation-initial-rate-limit {
+ leaf transaction-creation-initial-rate-limit {
default 100;
type non-zero-uint32-type;
description "The initial number of transactions per second that are allowed before the data store
should begin applying back pressure. This number is only used as an initial guidance,
subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
- }
-
- leaf transaction-debug-context-enabled {
- default false;
- type boolean;
- description "Enable or disable transaction context debug. This will log the call site trace for
- transactions that fail";
- }
-
- leaf custom-raft-policy-implementation {
+ }
+
+ leaf transaction-debug-context-enabled {
+ default false;
+ type boolean;
+ description "Enable or disable transaction context debug. This will log the call site trace for
+ transactions that fail";
+ }
+
+ leaf custom-raft-policy-implementation {
default "";
type string;
description "A fully qualified java class name. The class should implement
reflection. For now let's assume that these classes to customize raft behaviors should be
present in the distributed data store module itself. If this property is set to a class which
cannot be found then the default raft behavior will be applied";
- }
+ }
- leaf shard-snapshot-chunk-size {
+ leaf shard-snapshot-chunk-size {
default 2048000;
type non-zero-uint32-type;
- description "When sending a snapshot to a follower, this is the maximum size in bytes for
+ description "When sending a snapshot to a follower, this is the maximum size in bytes for
a chunk of data.";
- }
+ }
+
+ leaf use-tell-based-protocol {
+ default false;
+ type boolean;
+ description "Use a newer protocol between the frontend and backend. This feature is considered
+ exprerimental at this point.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
ArgumentCaptor<DOMDataTreeCandidate> candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
+ try (final AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
"test-1")) {
final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg =
dataStore.registerCommitCohort(TEST_ID, cohort);
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (final DistributedDataStore dataStore =
+ try (final AbstractDataStore dataStore =
setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
dataStore.registerCommitCohort(TEST_ID, failedCohort);
Thread.sleep(1000); // Registration is asynchronous
Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (final DistributedDataStore dataStore =
+ try (final AbstractDataStore dataStore =
setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
Thread.sleep(1000); // Registration is asynchronous
public void testWriteTransactionWithSingleShard() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
+ try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
"test-1")) {
testWriteTransaction(dataStore, TestModel.TEST_PATH,
public void testWriteTransactionWithMultipleShards() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testReadWriteTransactionWithSingleShard", "test-1")) {
// 1. Create a read-write Tx
public void testReadWriteTransactionWithMultipleShards() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
- try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+ try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
// Create the write Tx
CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
- try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+ try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
// Create the read-write Tx
InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
- try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+ try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
// Create the write Tx
InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
- try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+ try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
// Create the read-write Tx
datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
.shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
- try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+ try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
Object result = dataStore.getActorContext().executeOperation(
dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
public void testTransactionAbort() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
+ try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
"test-1")) {
DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
public void testTransactionChainWithSingleShard() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
+ try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
"test-1")) {
// 1. Create a Tx chain and write-only Tx
public void testTransactionChainWithMultipleShards() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
public void testCreateChainedTransactionAfterClose() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testCreateChainedTransactionAfterClose", "test-1")) {
DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
public void testChainedTransactionFailureWithSingleShard() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testChainedTransactionFailureWithSingleShard", "cars-1")) {
ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
public void testChainedTransactionFailureWithMultipleShards() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
+ try (AbstractDataStore dataStore = setupDistributedDataStore(
"testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
public void testChangeListenerRegistration() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
- try (DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
+ try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
"test-1")) {
testWriteTransaction(dataStore, TestModel.TEST_PATH,
new DatastoreSnapshot.ShardSnapshot("people",
org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
- try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+ try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
true, "cars", "people")) {
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
private final TransactionIdentifier tx1 = nextTransactionId();
private final TransactionIdentifier tx2 = nextTransactionId();
- private DistributedDataStore followerDistributedDataStore;
- private DistributedDataStore leaderDistributedDataStore;
+ private AbstractDataStore followerDistributedDataStore;
+ private AbstractDataStore leaderDistributedDataStore;
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
JavaTestKit.shutdownActorSystem(follower2System);
}
- private void initDatastoresWithCars(String type) {
+ private void initDatastoresWithCars(final String type) {
initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
}
- private void initDatastoresWithCarsAndPeople(String type) {
+ private void initDatastoresWithCarsAndPeople(final String type) {
initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
}
- private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
+ private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
}
- private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
+ private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
+ throws Exception {
Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, optional.isPresent());
assertEquals("Car list node", listBuilder.build(), optional.get());
}
- private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path,
- NormalizedNode<?, ?> expNode) throws Exception {
+ private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> expNode) throws Exception {
Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, optional.isPresent());
assertEquals("Data node", expNode, optional.get());
}
- private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
+ private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
+ throws Exception {
Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
assertEquals("exists", true, exists);
}
ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
- try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+ try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
.setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
}
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
- try (DistributedDataStore ds =
+ try (AbstractDataStore ds =
newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
- try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+ try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
// Create and submit a couple tx's so they're pending.
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
- try (DistributedDataStore ds =
+ try (AbstractDataStore ds =
follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
followerTestKit.waitForMembersUp("member-1", "member-3");
}
}
- private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+ private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
Answer<DatastoreContext> answer = invocation -> newBuilder.build();
protected DatastoreContext.Builder datastoreContextBuilder;
protected DatastoreSnapshot restoreFromSnapshot;
- public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
+ public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
super(actorSystem);
this.datastoreContextBuilder = datastoreContextBuilder;
}
return datastoreContextBuilder;
}
- public DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+ public AbstractDataStore setupDistributedDataStore(final String typeName, final String... shardNames) {
return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
- String... shardNames) {
+ public AbstractDataStore setupDistributedDataStore(final String typeName, final boolean waitUntilLeader,
+ final String... shardNames) {
return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
final boolean waitUntilLeader, final String... shardNames) {
return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
+ AbstractDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
restoreFromSnapshot);
dataStore.onGlobalContextUpdated(schemaContext);
return dataStore;
}
- public void waitUntilLeader(ActorContext actorContext, String... shardNames) {
+ public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
}
}
- public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
+ public void waitUntilNoLeader(final ActorContext actorContext, final String... shardNames) {
for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
assertNotNull("No local shard found for " + shardName, shard);
}
}
- public void waitForMembersUp(String... otherMembers) {
+ public void waitForMembersUp(final String... otherMembers) {
Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
fail("Member(s) " + otherMembersSet + " are not Up");
}
- public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+ public static ActorRef findLocalShard(final ActorContext actorContext, final String shardName) {
ActorRef shard = null;
for (int i = 0; i < 20 * 5 && shard == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
return shard;
}
- public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier)
- throws Exception {
+ public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
+ final ShardStatsVerifier verifier) throws Exception {
ActorContext actorContext = datastore.getActorContext();
Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
throw lastError;
}
- void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
- NormalizedNode<?, ?> nodeToWrite) throws Exception {
+ void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
+ final NormalizedNode<?, ?> nodeToWrite) throws Exception {
// 1. Create a write-only Tx
}
@SuppressWarnings("checkstyle:IllegalCatch")
- void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
+ void assertExceptionOnCall(final Callable<Void> callable, final Class<? extends Exception> expType)
throws Exception {
try {
callable.call();
}
void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
- Class<? extends Exception> expType) throws Exception {
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newWriteOnlyTransaction();
- return null;
- }
+ final Class<? extends Exception> expType) throws Exception {
+ assertExceptionOnCall(() -> {
+ txChain.newWriteOnlyTransaction();
+ return null;
}, expType);
assertExceptionOnCall(() -> {
static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
private IntegrationTestKit kit;
- private DistributedDataStore configDataStore;
- private DistributedDataStore operDataStore;
+ private AbstractDataStore configDataStore;
+ private AbstractDataStore operDataStore;
private DatastoreContext.Builder datastoreContextBuilder;
private boolean cleanedUp;
* callers to cleanup instances on test completion.
* @return a Builder instance
*/
- public static Builder builder(List<MemberNode> members) {
+ public static Builder builder(final List<MemberNode> members) {
return new Builder(members);
}
}
- public DistributedDataStore configDataStore() {
+ public AbstractDataStore configDataStore() {
return configDataStore;
}
- public DistributedDataStore operDataStore() {
+ public AbstractDataStore operDataStore() {
return operDataStore;
}
return datastoreContextBuilder;
}
- public void waitForMembersUp(String... otherMembers) {
+ public void waitForMembersUp(final String... otherMembers) {
kit.waitForMembersUp(otherMembers);
}
- public void waitForMemberDown(String member) {
+ public void waitForMemberDown(final String member) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
CurrentClusterState state = Cluster.get(kit.getSystem()).state();
}
}
- public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
- throws Exception {
+ public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
+ final RaftStateVerifier verifier) throws Exception {
ActorContext actorContext = datastore.getActorContext();
Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
throw lastError;
}
- public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
- String... peerMemberNames) throws Exception {
+ public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
+ final String... peerMemberNames) throws Exception {
final Set<String> peerIds = Sets.newHashSet();
for (String p: peerMemberNames) {
peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
raftState.getPeerAddresses().keySet()));
}
- public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
+ public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
.shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
- Builder(List<MemberNode> members) {
+ Builder(final List<MemberNode> members) {
this.members = members;
}
*
* @return this Builder
*/
- public Builder moduleShardsConfig(String newModuleShardsConfig) {
+ public Builder moduleShardsConfig(final String newModuleShardsConfig) {
this.moduleShardsConfig = newModuleShardsConfig;
return this;
}
*
* @return this Builder
*/
- public Builder akkaConfig(String newAkkaConfig) {
+ public Builder akkaConfig(final String newAkkaConfig) {
this.akkaConfig = newAkkaConfig;
return this;
}
*
* @return this Builder
*/
- public Builder testName(String newTestName) {
+ public Builder testName(final String newTestName) {
this.testName = newTestName;
return this;
}
*
* @return this Builder
*/
- public Builder waitForShardLeader(String... shardNames) {
+ public Builder waitForShardLeader(final String... shardNames) {
this.waitForshardLeader = shardNames;
return this;
}
*
* @return this Builder
*/
- public Builder createOperDatastore(boolean value) {
+ public Builder createOperDatastore(final boolean value) {
this.createOperDatastore = value;
return this;
}
*
* @return this Builder
*/
- public Builder schemaContext(SchemaContext newSchemaContext) {
+ public Builder schemaContext(final SchemaContext newSchemaContext) {
this.schemaContext = newSchemaContext;
return this;
}
*
* @return this Builder
*/
- public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
+ public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
datastoreContextBuilder = builder;
return this;
}
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.MemberNode;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
memberNodes.clear();
}
- private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) {
+ private static DistributedEntityOwnershipService newOwnershipService(final AbstractDataStore datastore) {
return DistributedEntityOwnershipService.start(datastore.getActorContext(),
EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
.moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
.moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
.moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
.moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(leaderDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
.moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore();
+ AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore();
follower1DistributedDataStore.waitTillReady();
leaderNode.waitForMembersUp("member-2");
.moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
.moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
.datastoreContextBuilder(followerDatastoreContextBuilder).build();
- DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
follower1Node.configDataStore().waitTillReady();
assertEquals("EntityOwnershipState", expState, state.get());
}
- private static void verifyCandidates(final DistributedDataStore dataStore, final DOMEntity entity,
+ private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity,
final String... expCandidates) throws Exception {
AssertionError lastError = null;
Stopwatch sw = Stopwatch.createStarted();
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private static void verifyOwner(final DistributedDataStore dataStore, final DOMEntity entity,
+ private static void verifyOwner(final AbstractDataStore dataStore, final DOMEntity entity,
final String expOwner) {
AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getIdentifier(), path -> {
try {
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
static int ID_COUNTER = 1;
private final String dataStoreName = "config" + ID_COUNTER++;
- private DistributedDataStore dataStore;
+ private AbstractDataStore dataStore;
@Before
public void setUp() {