Remove ask-based protocol client 67/108867/13
authorbentom-binoy <bentom.binoy@infosys.com>
Tue, 7 Nov 2023 06:44:41 +0000 (06:44 +0000)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 30 Dec 2023 21:16:22 +0000 (22:16 +0100)
Remove the use-tell-based-protocol knob, effectively making it always
true.

This means DistributedDataStore cannot be instantiated, which leads to
its removal. That in turn makes all of DatastoreContext,
OperationLimiter and similar classes superfluous, so we remove those as
well.

A few classes are used to drive the shard backend in integration tests,
and hence those are moved to test sources.

JIRA: CONTROLLER-2054
Change-Id: Ie20b1c898576d3c89b70b34121310e58faddbf8e
Signed-off-by: bentom-binoy <bentom.binoy@infosys.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
52 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDatastoreContextIntrospectorFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionModificationOperation.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohortTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapperTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/JsonExportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java with 62% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java with 94% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java with 90% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java

index 272d03a19696625fc8d3518555feed2ff01a1458..b03a4a114e0f87a8cd06c913ebce9cb2ec4fb0fc 100644 (file)
@@ -89,12 +89,6 @@ operational.persistent=false
 # for a message slice. This needs to be below Akka's maximum-frame-size and defaults to 480KiB.
 maximum-message-slice-size=491520
 
-# Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol
-# should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use tell-based protocol).
-# Set to false to enable ask-based protocol.
-# This option is obsolete and will be removed in the next major release.
-use-tell-based-protocol=true
-
 # Tune the maximum number of entries a follower is allowed to lag behind the leader before it is
 # considered out-of-sync. This flag may require tuning in face of a large number of small transactions.
 #sync-index-threshold=10
index 4aa075f3d3fb492a6be54fff46091190703a73ef..75bd53ade65b8776f7b7430423e6bcd662d6a4fd 100644 (file)
@@ -12,7 +12,7 @@ import java.util.Map;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java
deleted file mode 100644 (file)
index 49b398f..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.List;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.Empty;
-import scala.concurrent.Future;
-
-/**
- * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
- * implementation. In addition to the usual set of methods it also contains the list of actor
- * futures.
- */
-public abstract class AbstractThreePhaseCommitCohort<T> implements DOMStoreThreePhaseCommitCohort {
-    protected static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
-        Futures.immediateFuture(Empty.value());
-    protected static final @NonNull ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
-        Futures.immediateFuture(Boolean.TRUE);
-
-    abstract List<Future<T>> getCohortFutures();
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
deleted file mode 100644 (file)
index fa77b5f..0000000
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.util.Try;
-
-/**
- * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
- * transaction factories.
- */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
-    @SuppressWarnings("rawtypes")
-    private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
-
-    private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
-    private final @NonNull LocalHistoryIdentifier historyId;
-    private final @NonNull ActorUtils actorUtils;
-
-    // Used via TX_COUNTER_UPDATER
-    @SuppressWarnings("unused")
-    private volatile long nextTx;
-
-    protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
-        this.actorUtils = requireNonNull(actorUtils);
-        this.historyId = requireNonNull(historyId);
-    }
-
-    final ActorUtils getActorUtils() {
-        return actorUtils;
-    }
-
-    final LocalHistoryIdentifier getHistoryId() {
-        return historyId;
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
-            final String shardName) {
-        final LocalTransactionFactory local = knownLocal.get(shardName);
-        if (local != null) {
-            LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
-                shardName, local);
-
-            try {
-                return createLocalTransactionContext(local, parent);
-            } catch (Exception e) {
-                return new NoOpTransactionContext(e, parent.getIdentifier());
-            }
-        }
-
-        return null;
-    }
-
-    private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
-            final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
-            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
-        LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
-                parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
-
-        updateShardInfo(shardName, primaryShardInfo);
-
-        final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
-        try {
-            if (localContext != null) {
-                LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
-                        parent.getIdentifier());
-                return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
-                        localContext);
-            }
-
-            LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
-                parent.getIdentifier());
-            final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
-                transactionContextWrapper, parent, shardName);
-            remote.setPrimaryShard(primaryShardInfo);
-            return transactionContextWrapper;
-        } finally {
-            onTransactionContextCreated(parent.getIdentifier());
-        }
-    }
-
-    private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
-            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
-        LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
-                primaryShardInfo.getPrimaryShardActor(), shardName);
-
-        updateShardInfo(shardName, primaryShardInfo);
-
-        final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
-        try {
-            if (localContext != null) {
-                transactionContextWrapper.executePriorTransactionOperations(localContext);
-            } else {
-                final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
-                        transactionContextWrapper, parent, shardName);
-                remote.setPrimaryShard(primaryShardInfo);
-            }
-        } finally {
-            onTransactionContextCreated(parent.getIdentifier());
-        }
-    }
-
-    private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
-            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
-        LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
-
-        try {
-            transactionContextWrapper.executePriorTransactionOperations(
-                    new NoOpTransactionContext(failure, parent.getIdentifier()));
-        } finally {
-            onTransactionContextCreated(parent.getIdentifier());
-        }
-    }
-
-    final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
-            final String shardName) {
-        final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
-                parent.getIdentifier(), actorUtils, shardName);
-        final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
-        if (findPrimaryFuture.isCompleted()) {
-            final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
-            if (maybe.isSuccess()) {
-                return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
-            }
-
-            onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
-        } else {
-            findPrimaryFuture.onComplete(result -> {
-                if (result.isSuccess()) {
-                    onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
-                } else {
-                    onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
-                }
-                return null;
-            }, actorUtils.getClientDispatcher());
-        }
-        return contextWrapper;
-    }
-
-    private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
-        if (maybeDataTree.isPresent()) {
-            if (!knownLocal.containsKey(shardName)) {
-                LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
-
-                F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(),
-                    maybeDataTree.orElseThrow());
-                knownLocal.putIfAbsent(shardName, factory);
-            }
-        } else if (knownLocal.containsKey(shardName)) {
-            LOG.debug("Shard {} invalidating local data tree", shardName);
-
-            knownLocal.remove(shardName);
-        }
-    }
-
-    protected final MemberName getMemberName() {
-        return historyId.getClientId().getFrontendId().getMemberName();
-    }
-
-    /**
-     * Create an identifier for the next TransactionProxy attached to this component
-     * factory.
-     * @return Transaction identifier, may not be null.
-     */
-    protected final TransactionIdentifier nextIdentifier() {
-        return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
-    }
-
-    /**
-     * Find the primary shard actor.
-     *
-     * @param shardName Shard name
-     * @return Future containing shard information.
-     */
-    protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
-            @NonNull TransactionIdentifier txId);
-
-    /**
-     * Create local transaction factory for specified shard, backed by specified shard leader
-     * and data tree instance.
-     *
-     * @param shardName the shard name
-     * @param shardLeader the shard leader
-     * @param dataTree Backing data tree instance. The data tree may only be accessed in
-     *                 read-only manner.
-     * @return Transaction factory for local use.
-     */
-    protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
-
-    /**
-     * Callback invoked from child transactions to push any futures, which need to
-     * be waited for before the next transaction is allocated.
-     * @param cohortFutures Collection of futures
-     */
-    protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
-            @NonNull Collection<Future<T>> cohortFutures);
-
-    /**
-     * Callback invoked when the internal TransactionContext has been created for a transaction.
-     *
-     * @param transactionId the ID of the transaction.
-     */
-    protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
-
-    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
-                                                                    final TransactionProxy parent) {
-        return switch (parent.getType()) {
-            case READ_ONLY -> {
-                final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                yield new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
-                    @Override
-                    DOMStoreWriteTransaction getWriteDelegate() {
-                        throw new UnsupportedOperationException();
-                    }
-
-                    @Override
-                    DOMStoreReadTransaction getReadDelegate() {
-                        return readOnly;
-                    }
-                };
-            }
-            case READ_WRITE -> {
-                final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                yield new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
-                    @Override
-                    DOMStoreWriteTransaction getWriteDelegate() {
-                        return readWrite;
-                    }
-
-                    @Override
-                    DOMStoreReadTransaction getReadDelegate() {
-                        return readWrite;
-                    }
-                };
-            }
-            case WRITE_ONLY -> {
-                final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                yield new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
-                    @Override
-                    DOMStoreWriteTransaction getWriteDelegate() {
-                        return writeOnly;
-                    }
-
-                    @Override
-                    DOMStoreReadTransaction getReadDelegate() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-            default -> throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
-        };
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java
deleted file mode 100644 (file)
index 49dac87..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Future;
-
-/**
- * A helper class that wraps an eventual TransactionContext instance. We have two specializations:
- * <ul>
- *   <li>{@link DelayedTransactionContextWrapper}, which enqueues operations towards the backend</li>
- *   <li>{@link DirectTransactionContextWrapper}, which sends operations to the backend</li>
- * </ul>
- */
-abstract class AbstractTransactionContextWrapper {
-    private final TransactionIdentifier identifier;
-    private final OperationLimiter limiter;
-    private final String shardName;
-
-    AbstractTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
-                                      @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
-        this.identifier = requireNonNull(identifier);
-        this.shardName = requireNonNull(shardName);
-        limiter = new OperationLimiter(identifier,
-            // 1 extra permit for the ready operation
-            actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
-            TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
-    }
-
-    final TransactionIdentifier getIdentifier() {
-        return identifier;
-    }
-
-    final OperationLimiter getLimiter() {
-        return limiter;
-    }
-
-    final String getShardName() {
-        return shardName;
-    }
-
-    abstract @Nullable TransactionContext getTransactionContext();
-
-    /**
-     * Either enqueue or execute specified operation.
-     *
-     * @param op Operation to (eventually) execute
-     */
-    abstract void maybeExecuteTransactionOperation(TransactionOperation op);
-
-    /**
-     * Mark the transaction as ready.
-     *
-     * @param participatingShardNames Shards which participate on the transaction
-     * @return Future indicating the transaction has been readied on the backend
-     */
-    abstract @NonNull Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames);
-}
index d12364265b03a433fa37b7983f3f3a44d1bc06a7..4b55dda597df474490c6126646904b50101f985a 100644 (file)
@@ -24,7 +24,7 @@ import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,8 +96,6 @@ public class DatastoreContext implements ClientActorConfig {
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
-    @Deprecated(since = "7.0.0", forRemoval = true)
-    private boolean useTellBasedProtocol = true;
     private boolean transactionDebugContextEnabled = false;
     private String shardManagerPersistenceId;
     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
@@ -148,7 +146,6 @@ public class DatastoreContext implements ClientActorConfig {
         shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
         transactionDebugContextEnabled = other.transactionDebugContextEnabled;
         shardManagerPersistenceId = other.shardManagerPersistenceId;
-        useTellBasedProtocol = other.useTellBasedProtocol;
         backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
         requestTimeout = other.requestTimeout;
         noProgressTimeout = other.noProgressTimeout;
@@ -366,11 +363,6 @@ public class DatastoreContext implements ClientActorConfig {
         return transactionDebugContextEnabled;
     }
 
-    @Deprecated(since = "7.0.0", forRemoval = true)
-    public boolean isUseTellBasedProtocol() {
-        return useTellBasedProtocol;
-    }
-
     public boolean isUseLz4Compression() {
         return useLz4Compression;
     }
@@ -604,12 +596,6 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
-        @Deprecated(since = "7.0.0", forRemoval = true)
-        public Builder useTellBasedProtocol(final boolean value) {
-            datastoreContext.useTellBasedProtocol = value;
-            return this;
-        }
-
         public Builder useLz4Compression(final boolean value) {
             datastoreContext.useLz4Compression = value;
             return this;
index d9445f18a1cc237a61376fffcd92d87d20735628..ac50ff30a26e961f81517ac365441e578d1c51e0 100644 (file)
@@ -30,8 +30,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.text.WordUtils;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
 import org.opendaylight.yangtools.yang.common.Uint16;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.common.Uint64;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohort.java
deleted file mode 100644 (file)
index 4562ed1..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * An AbstractThreePhaseCommitCohort implementation used for debugging. If a failure occurs, the transaction
- * call site is printed.
- *
- * @author Thomas Pantelis
- */
-class DebugThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
-    private static final Logger LOG = LoggerFactory.getLogger(DebugThreePhaseCommitCohort.class);
-
-    private final AbstractThreePhaseCommitCohort<?> delegate;
-    private final Throwable debugContext;
-    private final TransactionIdentifier transactionId;
-
-    @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_FINAL")
-    private Logger log = LOG;
-
-    DebugThreePhaseCommitCohort(final TransactionIdentifier transactionId,
-            final AbstractThreePhaseCommitCohort<?> delegate, final Throwable debugContext) {
-        this.delegate = requireNonNull(delegate);
-        this.debugContext = requireNonNull(debugContext);
-        this.transactionId = requireNonNull(transactionId);
-    }
-
-    private <V> ListenableFuture<V> addFutureCallback(final ListenableFuture<V> future) {
-        Futures.addCallback(future, new FutureCallback<V>() {
-            @Override
-            public void onSuccess(final V result) {
-                // no-op
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                log.warn("Transaction {} failed with error \"{}\" - was allocated in the following context",
-                        transactionId, failure, debugContext);
-            }
-        }, MoreExecutors.directExecutor());
-
-        return future;
-    }
-
-    @Override
-    public ListenableFuture<Boolean> canCommit() {
-        return addFutureCallback(delegate.canCommit());
-    }
-
-    @Override
-    public ListenableFuture<Empty> preCommit() {
-        return addFutureCallback(delegate.preCommit());
-    }
-
-    @Override
-    public ListenableFuture<? extends CommitInfo> commit() {
-        return addFutureCallback(delegate.commit());
-    }
-
-    @Override
-    public ListenableFuture<Empty> abort() {
-        return delegate.abort();
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    List<Future<Object>> getCohortFutures() {
-        return ((AbstractThreePhaseCommitCohort)delegate).getCohortFutures();
-    }
-
-    @VisibleForTesting
-    void setLogger(final Logger logger) {
-        log = logger;
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java
deleted file mode 100644 (file)
index 95379de..0000000
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * Delayed implementation of TransactionContextWrapper. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes
- * available at which time they are executed.
- *
- * @author Thomas Pantelis
- */
-final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
-    private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
-
-    /**
-     * The list of transaction operations to execute once the TransactionContext becomes available.
-     */
-    @GuardedBy("queuedTxOperations")
-    private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
-
-    /**
-     * The resulting TransactionContext.
-     */
-    private volatile TransactionContext transactionContext;
-    @GuardedBy("queuedTxOperations")
-    private TransactionContext deferredTransactionContext;
-    @GuardedBy("queuedTxOperations")
-    private boolean pendingEnqueue;
-
-    DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
-                                     @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
-        super(identifier, actorUtils, shardName);
-    }
-
-    @Override
-    TransactionContext getTransactionContext() {
-        return transactionContext;
-    }
-
-    @Override
-    void maybeExecuteTransactionOperation(final TransactionOperation op) {
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            op.invoke(localContext, null);
-        } else {
-            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-            // callback to be executed after the Tx is created.
-            enqueueTransactionOperation(op);
-        }
-    }
-
-    @Override
-    Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
-        // avoid the creation of a promise and a TransactionOperation
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            return localContext.readyTransaction(null, participatingShardNames);
-        }
-
-        final Promise<ActorSelection> promise = Futures.promise();
-        enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
-            }
-        });
-
-        return promise.future();
-    }
-
-    /**
-     * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
-     * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
-     * context is not available.
-     */
-    private void enqueueTransactionOperation(final TransactionOperation operation) {
-        // We have three things to do here:
-        // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
-        // - acquire a permit for the operation if we still need to enqueue it
-        // - enqueue the operation
-        //
-        // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
-        // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
-        // complications are:
-        // - this method may be called from the thread invoking executePriorTransactionOperations()
-        // - user may be violating API contract of using the transaction from a single thread
-
-        // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
-        // the lock, we will assert that we will be enqueing another operation.
-        final TransactionContext contextOnEntry;
-        synchronized (queuedTxOperations) {
-            contextOnEntry = transactionContext;
-            if (contextOnEntry == null) {
-                checkState(!pendingEnqueue, "Concurrent access to transaction %s detected", getIdentifier());
-                pendingEnqueue = true;
-            }
-        }
-
-        // Short-circuit if there is a context
-        if (contextOnEntry != null) {
-            operation.invoke(transactionContext, null);
-            return;
-        }
-
-        boolean cleanupEnqueue = true;
-        TransactionContext finishHandoff = null;
-        try {
-            // Acquire the permit,
-            final boolean havePermit = getLimiter().acquire();
-            if (!havePermit) {
-                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
-                        getShardName());
-            }
-
-            // Ready to enqueue, take the lock again and append the operation
-            synchronized (queuedTxOperations) {
-                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
-                queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
-                pendingEnqueue = false;
-                cleanupEnqueue = false;
-                finishHandoff = deferredTransactionContext;
-                deferredTransactionContext = null;
-            }
-        } finally {
-            if (cleanupEnqueue) {
-                synchronized (queuedTxOperations) {
-                    pendingEnqueue = false;
-                    finishHandoff = deferredTransactionContext;
-                    deferredTransactionContext = null;
-                }
-            }
-            if (finishHandoff != null) {
-                executePriorTransactionOperations(finishHandoff);
-            }
-        }
-    }
-
-    void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
-        while (true) {
-            // Access to queuedTxOperations and transactionContext must be protected and atomic
-            // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
-            // issues and ensure no TransactionOperation is missed and that they are processed
-            // in the order they occurred.
-
-            // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
-            // in case a TransactionOperation results in another transaction operation being
-            // queued (eg a put operation from a client read Future callback that is notified
-            // synchronously).
-            final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
-            synchronized (queuedTxOperations) {
-                if (queuedTxOperations.isEmpty()) {
-                    if (!pendingEnqueue) {
-                        // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
-                        localTransactionContext.operationHandOffComplete();
-
-                        // This is null-to-non-null transition after which we are releasing the lock and not doing
-                        // any further processing.
-                        transactionContext = localTransactionContext;
-                    } else {
-                        deferredTransactionContext = localTransactionContext;
-                    }
-                    return;
-                }
-
-                operationsBatch = new ArrayList<>(queuedTxOperations);
-                queuedTxOperations.clear();
-            }
-
-            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
-            // that we need to re-acquire the lock below but this should be negligible.
-            for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
-                final Boolean permit = oper.getValue();
-                if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
-                    // If the context is not using limiting we need to release operations as we are queueing them, so
-                    // user threads are not charged for them.
-                    getLimiter().release();
-                }
-                oper.getKey().invoke(localTransactionContext, permit);
-            }
-        }
-    }
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java
deleted file mode 100644 (file)
index f004088..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Future;
-
-/**
- * Direct implementation of TransactionContextWrapper. Operation are executed directly on TransactionContext. Always
- * has completed context and executes on local shard.
- */
-final class DirectTransactionContextWrapper extends AbstractTransactionContextWrapper {
-    private final TransactionContext transactionContext;
-
-    DirectTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
-                                    @NonNull final ActorUtils actorUtils,
-                                    @NonNull final String shardName,
-                                    @NonNull final TransactionContext transactionContext) {
-        super(identifier, actorUtils, shardName);
-        this.transactionContext = requireNonNull(transactionContext);
-    }
-
-    @Override
-    TransactionContext getTransactionContext() {
-        return transactionContext;
-    }
-
-    @Override
-    void maybeExecuteTransactionOperation(final TransactionOperation op) {
-        op.invoke(transactionContext, null);
-    }
-
-    @Override
-    Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
-        return transactionContext.readyTransaction(null, participatingShardNames);
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
deleted file mode 100644 (file)
index b7131b0..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Implements a distributed DOMStore using Akka {@code Patterns.ask()}.
- *
- * @deprecated This implementation is destined for removal,
- */
-@Deprecated(since = "7.0.0", forRemoval = true)
-public class DistributedDataStore extends AbstractDataStore {
-    private final TransactionContextFactory txContextFactory;
-
-    public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
-            final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
-            final DatastoreSnapshot restoreFromSnapshot) {
-        super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
-        txContextFactory = new TransactionContextFactory(getActorUtils(), getIdentifier());
-    }
-
-    @Override
-    public DOMStoreTransactionChain createTransactionChain() {
-        return txContextFactory.createTransactionChain();
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY);
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        getActorUtils().acquireTxCreationPermit();
-        return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY);
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        getActorUtils().acquireTxCreationPermit();
-        return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
-    }
-
-    @Override
-    public void close() {
-        txContextFactory.close();
-        super.close();
-    }
-}
index 8a50417f67f610df83594fb60dad192000686a17..2325b5bd57acecef51c29b67841ace05f0fc2d8e 100644 (file)
@@ -7,12 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,8 +58,8 @@ public final class DistributedDataStoreFactory {
         final String datastoreName = initialDatastoreContext.getDataStoreName();
         LOG.info("Create data store instance of type : {}", datastoreName);
 
-        final ActorSystem actorSystem = actorSystemProvider.getActorSystem();
-        final DatastoreSnapshot restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
+        final var actorSystem = actorSystemProvider.getActorSystem();
+        final var restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
 
         final Configuration config;
         if (orgConfig == null) {
@@ -69,24 +67,12 @@ public final class DistributedDataStoreFactory {
         } else {
             config = orgConfig;
         }
-        final ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
-        final DatastoreContextFactory contextFactory = introspector.newContextFactory();
+        final var clusterWrapper = new ClusterWrapperImpl(actorSystem);
+        final var contextFactory = introspector.newContextFactory();
 
-        // This is the potentially-updated datastore context, distinct from the initial one
-        final DatastoreContext datastoreContext = contextFactory.getBaseDatastoreContext();
-
-        final AbstractDataStore dataStore;
-        if (datastoreContext.isUseTellBasedProtocol()) {
-            dataStore = new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory,
-                restoreFromSnapshot);
-            LOG.info("Data store {} is using tell-based protocol", datastoreName);
-        } else {
-            dataStore = new DistributedDataStore(actorSystem, clusterWrapper, config, contextFactory,
-                restoreFromSnapshot);
-            LOG.warn("Data store {} is using ask-based protocol, which will be removed in the next major release",
-                datastoreName);
-        }
-
-        return dataStore;
+        final var ret = new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory,
+            restoreFromSnapshot);
+        LOG.info("Data store {} is using tell-based protocol", datastoreName);
+        return ret;
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
deleted file mode 100644 (file)
index 091bfa0..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM
- * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to
- * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message.
- * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
- * are no-ops.
- */
-class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
-
-    private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
-    private final DataTreeModification modification;
-    private final ActorUtils actorUtils;
-    private final ActorSelection leader;
-    private final Exception operationError;
-
-    protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
-            final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
-            final DataTreeModification modification,
-            final Exception operationError) {
-        this.actorUtils = requireNonNull(actorUtils);
-        this.leader = requireNonNull(leader);
-        this.transaction = requireNonNull(transaction);
-        this.modification = requireNonNull(modification);
-        this.operationError = operationError;
-    }
-
-    protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
-            final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
-        this.actorUtils = requireNonNull(actorUtils);
-        this.leader = requireNonNull(leader);
-        this.transaction = requireNonNull(transaction);
-        this.operationError = requireNonNull(operationError);
-        modification = null;
-    }
-
-    private Future<Object> initiateCommit(final boolean immediate,
-            final Optional<SortedSet<String>> participatingShardNames) {
-        if (operationError != null) {
-            return Futures.failed(operationError);
-        }
-
-        final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
-                modification, immediate, participatingShardNames);
-        return actorUtils.executeOperationAsync(leader, message, actorUtils.getTransactionCommitOperationTimeout());
-    }
-
-    Future<ActorSelection> initiateCoordinatedCommit(final Optional<SortedSet<String>> participatingShardNames) {
-        final Future<Object> messageFuture = initiateCommit(false, participatingShardNames);
-        final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorUtils,
-                transaction.getIdentifier());
-        ret.onComplete(new OnComplete<ActorSelection>() {
-            @Override
-            public void onComplete(final Throwable failure, final ActorSelection success) {
-                if (failure != null) {
-                    LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
-                    transactionAborted(transaction);
-                    return;
-                }
-
-                LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
-            }
-        }, actorUtils.getClientDispatcher());
-
-        return ret;
-    }
-
-    Future<Object> initiateDirectCommit() {
-        final Future<Object> messageFuture = initiateCommit(true, Optional.empty());
-        messageFuture.onComplete(new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object message) {
-                if (failure != null) {
-                    LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
-                    transactionAborted(transaction);
-                } else if (CommitTransactionReply.isSerializedType(message)) {
-                    LOG.debug("Transaction {} committed successfully", transaction.getIdentifier());
-                    transactionCommitted(transaction);
-                } else {
-                    LOG.error("Transaction {} resulted in unhandled message type {}, aborting",
-                        transaction.getIdentifier(), message.getClass());
-                    transactionAborted(transaction);
-                }
-            }
-        }, actorUtils.getClientDispatcher());
-
-        return messageFuture;
-    }
-
-    @Override
-    public final ListenableFuture<Boolean> canCommit() {
-        // Intended no-op
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public final ListenableFuture<Empty> preCommit() {
-        // Intended no-op
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public final ListenableFuture<Empty> abort() {
-        // Intended no-op
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public final ListenableFuture<CommitInfo> commit() {
-        // Intended no-op
-        throw new UnsupportedOperationException();
-    }
-
-    protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> aborted) {
-    }
-
-    protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> comitted) {
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java
deleted file mode 100644 (file)
index fd3e8cd..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.dom.spi.store.AbstractSnapshotBackedTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-
-/**
- * Transaction chain instantiated on top of a locally-available DataTree. It does not instantiate
- * a transaction in the leader and rather chains transactions on top of themselves.
- */
-final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain<TransactionIdentifier>
-        implements LocalTransactionFactory {
-    private static final Throwable ABORTED = new Throwable("Transaction aborted");
-    private final TransactionChainProxy parent;
-    private final ActorSelection leader;
-    private final ReadOnlyDataTree tree;
-
-    LocalTransactionChain(final TransactionChainProxy parent, final ActorSelection leader,
-            final ReadOnlyDataTree tree) {
-        this.parent = requireNonNull(parent);
-        this.leader = requireNonNull(leader);
-        this.tree = requireNonNull(tree);
-    }
-
-    ReadOnlyDataTree getDataTree() {
-        return tree;
-    }
-
-    @Override
-    protected TransactionIdentifier nextTransactionIdentifier() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean getDebugTransactions() {
-        return false;
-    }
-
-    @Override
-    protected DataTreeSnapshot takeSnapshot() {
-        return tree.takeSnapshot();
-    }
-
-    @Override
-    protected DOMStoreThreePhaseCommitCohort createCohort(
-            final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
-            final DataTreeModification modification,
-            final Exception operationError) {
-        return new LocalChainThreePhaseCommitCohort(transaction, modification, operationError);
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction(final TransactionIdentifier identifier) {
-        return super.newReadOnlyTransaction(identifier);
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction(final TransactionIdentifier identifier) {
-        return super.newReadWriteTransaction(identifier);
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction(final TransactionIdentifier identifier) {
-        return super.newWriteOnlyTransaction(identifier);
-    }
-
-    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
-    @Override
-    public LocalThreePhaseCommitCohort onTransactionReady(final DOMStoreWriteTransaction tx,
-            final Exception operationError) {
-        checkArgument(tx instanceof SnapshotBackedWriteTransaction);
-        if (operationError != null) {
-            return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx,
-                    operationError);
-        }
-
-        try {
-            return (LocalThreePhaseCommitCohort) tx.ready();
-        } catch (Exception e) {
-            // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
-            // LocalThreePhaseCommitCohort and the base class.
-            return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
-        }
-    }
-
-    private class LocalChainThreePhaseCommitCohort extends LocalThreePhaseCommitCohort {
-
-        protected LocalChainThreePhaseCommitCohort(
-                final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
-                final DataTreeModification modification, final Exception operationError) {
-            super(parent.getActorUtils(), leader, transaction, modification, operationError);
-        }
-
-        protected LocalChainThreePhaseCommitCohort(
-                final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
-                final Exception operationError) {
-            super(parent.getActorUtils(), leader, transaction, operationError);
-        }
-
-        @Override
-        protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-            onTransactionFailed(transaction, ABORTED);
-        }
-
-        @Override
-        protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-            onTransactionCommited(transaction);
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
deleted file mode 100644 (file)
index 6b30069..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.Future;
-
-/**
- * Processes front-end transaction operations locally before being committed to the destination shard.
- * Instances of this class are used when the destination shard is local to the caller.
- *
- * @author Thomas Pantelis
- */
-abstract class LocalTransactionContext extends TransactionContext {
-    private final DOMStoreTransaction txDelegate;
-    private final LocalTransactionReadySupport readySupport;
-    private Exception operationError;
-
-    LocalTransactionContext(final DOMStoreTransaction txDelegate, final TransactionIdentifier identifier,
-            final LocalTransactionReadySupport readySupport) {
-        super(identifier);
-        this.txDelegate = requireNonNull(txDelegate);
-        this.readySupport = readySupport;
-    }
-
-    abstract DOMStoreWriteTransaction getWriteDelegate();
-
-    abstract DOMStoreReadTransaction getReadDelegate();
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void executeModification(final Consumer<DOMStoreWriteTransaction> consumer) {
-        incrementModificationCount();
-        if (operationError == null) {
-            try {
-                consumer.accept(getWriteDelegate());
-            } catch (Exception e) {
-                operationError = e;
-            }
-        }
-    }
-
-    @Override
-    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
-        executeModification(transaction -> transaction.delete(path));
-    }
-
-    @Override
-    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        executeModification(transaction -> transaction.merge(path, data));
-    }
-
-    @Override
-    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        executeModification(transaction -> transaction.write(path, data));
-    }
-
-    @Override
-    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
-            final Boolean havePermit) {
-        Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
-            @Override
-            public void onSuccess(final T result) {
-                proxyFuture.set(result);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                proxyFuture.setException(failure instanceof Exception
-                        ? ReadFailedException.MAPPER.apply((Exception) failure) : failure);
-            }
-        }, MoreExecutors.directExecutor());
-    }
-
-    @Override
-    Future<ActorSelection> readyTransaction(final Boolean havePermit,
-            final Optional<SortedSet<String>> participatingShardNames) {
-        final LocalThreePhaseCommitCohort cohort = ready();
-        return cohort.initiateCoordinatedCommit(participatingShardNames);
-    }
-
-    @Override
-    Future<Object> directCommit(final Boolean havePermit) {
-        final LocalThreePhaseCommitCohort cohort = ready();
-        return cohort.initiateDirectCommit();
-    }
-
-    @Override
-    void closeTransaction() {
-        txDelegate.close();
-    }
-
-    private LocalThreePhaseCommitCohort ready() {
-        logModificationCount();
-        return readySupport.onTransactionReady(getWriteDelegate(), operationError);
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java
deleted file mode 100644 (file)
index e6be3a0..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
- * transactions on shards which are co-located with the shard leader.
- *
- * @author Thomas Pantelis
- */
-interface LocalTransactionFactory extends LocalTransactionReadySupport {
-    DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
-
-    DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
-
-    DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier);
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java
deleted file mode 100644 (file)
index 43f6320..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedTransactions;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-
-/**
- * {@link LocalTransactionFactory} for instantiating backing transactions which are
- * disconnected from each other, ie not chained. These are used by {@link AbstractTransactionContextFactory}
- * to instantiate transactions on shards which are co-located with the shard leader.
- */
-final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<TransactionIdentifier>
-        implements LocalTransactionFactory {
-
-    private final ActorSelection leader;
-    private final ReadOnlyDataTree dataTree;
-    private final ActorUtils actorUtils;
-
-    LocalTransactionFactoryImpl(final ActorUtils actorUtils, final ActorSelection leader,
-            final ReadOnlyDataTree dataTree) {
-        this.leader = requireNonNull(leader);
-        this.dataTree = requireNonNull(dataTree);
-        this.actorUtils = actorUtils;
-    }
-
-    ReadOnlyDataTree getDataTree() {
-        return dataTree;
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction(final TransactionIdentifier identifier) {
-        return SnapshotBackedTransactions.newReadTransaction(identifier, false, dataTree.takeSnapshot());
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction(final TransactionIdentifier identifier) {
-        return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction(final TransactionIdentifier identifier) {
-        return SnapshotBackedTransactions.newWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
-    }
-
-    @Override
-    protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
-        // No-op
-    }
-
-    @Override
-    protected DOMStoreThreePhaseCommitCohort transactionReady(
-            final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
-            final DataTreeModification tree,
-            final Exception readyError) {
-        return new LocalThreePhaseCommitCohort(actorUtils, leader, tx, tree, readyError);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public LocalThreePhaseCommitCohort onTransactionReady(final DOMStoreWriteTransaction tx,
-            final Exception operationError) {
-        checkArgument(tx instanceof SnapshotBackedWriteTransaction);
-        if (operationError != null) {
-            return new LocalThreePhaseCommitCohort(actorUtils, leader,
-                    (SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, operationError);
-        }
-
-        return (LocalThreePhaseCommitCohort) tx.ready();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java
deleted file mode 100644 (file)
index 103af19..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Interface for a class that can "ready" a transaction.
- *
- * @author Thomas Pantelis
- */
-interface LocalTransactionReadySupport {
-    LocalThreePhaseCommitCohort onTransactionReady(@NonNull DOMStoreWriteTransaction tx,
-            @Nullable Exception operationError);
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java
deleted file mode 100644 (file)
index 9b47732..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collections;
-import java.util.List;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.yangtools.yang.common.Empty;
-import scala.concurrent.Future;
-
-/**
- * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
- * instance given out for empty transactions.
- */
-final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
-    static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
-
-    private NoOpDOMStoreThreePhaseCommitCohort() {
-        // Hidden to prevent instantiation
-    }
-
-    @Override
-    public ListenableFuture<Boolean> canCommit() {
-        return IMMEDIATE_BOOLEAN_SUCCESS;
-    }
-
-    @Override
-    public ListenableFuture<Empty> preCommit() {
-        return IMMEDIATE_EMPTY_SUCCESS;
-    }
-
-    @Override
-    public ListenableFuture<Empty> abort() {
-        return IMMEDIATE_EMPTY_SUCCESS;
-    }
-
-    @Override
-    public ListenableFuture<CommitInfo> commit() {
-        return CommitInfo.emptyFluentFuture();
-    }
-
-    @Override
-    List<Future<Object>> getCohortFutures() {
-        return Collections.emptyList();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
deleted file mode 100644 (file)
index bfb0046..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-final class NoOpTransactionContext extends TransactionContext {
-    private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
-
-    private final Throwable failure;
-
-    NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
-        super(identifier);
-        this.failure = failure;
-    }
-
-    @Override
-    void closeTransaction() {
-        LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
-    }
-
-    @Override
-    Future<Object> directCommit(final Boolean havePermit) {
-        LOG.debug("Tx {} directCommit called, failure", getIdentifier(), failure);
-        return akka.dispatch.Futures.failed(failure);
-    }
-
-    @Override
-    Future<ActorSelection> readyTransaction(final Boolean havePermit,
-            final Optional<SortedSet<String>> participatingShardNamess) {
-        LOG.debug("Tx {} readyTransaction called, failure", getIdentifier(), failure);
-        return akka.dispatch.Futures.failed(failure);
-    }
-
-    @Override
-    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture, final Boolean havePermit) {
-        LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
-                readCmd.getPath());
-
-        final Throwable t;
-        if (failure instanceof NoShardLeaderException) {
-            t = new DataStoreUnavailableException(failure.getMessage(), failure);
-        } else {
-            t = failure;
-        }
-        proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName()
-                + " for path " + readCmd.getPath(), t));
-    }
-
-    @Override
-    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
-        LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
-    }
-
-    @Override
-    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
-    }
-
-    @Override
-    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
deleted file mode 100644 (file)
index 3f0c98c..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class for limiting operations.
- */
-public class OperationLimiter  {
-    private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
-    private final TransactionIdentifier identifier;
-    private final long acquireTimeout;
-    private final Semaphore semaphore;
-    private final int maxPermits;
-
-    OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) {
-        this.identifier = requireNonNull(identifier);
-
-        checkArgument(acquireTimeoutSeconds >= 0);
-        this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
-
-        checkArgument(maxPermits >= 0);
-        this.maxPermits = maxPermits;
-        this.semaphore = new Semaphore(maxPermits);
-    }
-
-    boolean acquire() {
-        return acquire(1);
-    }
-
-    boolean acquire(final int acquirePermits) {
-        try {
-            if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
-                return true;
-            }
-        } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
-            }
-        }
-
-        return false;
-    }
-
-    void release() {
-        release(1);
-    }
-
-    void release(final int permits) {
-        this.semaphore.release(permits);
-    }
-
-    @VisibleForTesting
-    TransactionIdentifier getIdentifier() {
-        return identifier;
-    }
-
-    @VisibleForTesting
-    int availablePermits() {
-        return semaphore.availablePermits();
-    }
-
-    /**
-     * Release all the permits.
-     */
-    public void releaseAll() {
-        this.semaphore.release(maxPermits - availablePermits());
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
deleted file mode 100644 (file)
index ade9c37..0000000
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Redirects front-end transaction operations to a shard for processing. Instances of this class are used
- * when the destination shard is remote to the caller.
- *
- * @author Thomas Pantelis
- */
-final class RemoteTransactionContext extends TransactionContext {
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
-
-    private final ActorUtils actorUtils;
-    private final ActorSelection actor;
-    private final OperationLimiter limiter;
-
-    private BatchedModifications batchedModifications;
-    private int totalBatchedModificationsSent;
-    private int batchPermits;
-
-    /**
-     * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
-     * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
-     * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
-     * message to resynchronize with the backend, sharing a 'lost message' failure path.
-     */
-    private volatile Throwable failedModification;
-
-    RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
-            final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
-        super(identifier, remoteTransactionVersion);
-        this.limiter = requireNonNull(limiter);
-        this.actor = actor;
-        this.actorUtils = actorUtils;
-    }
-
-    private ActorSelection getActor() {
-        return actor;
-    }
-
-    protected ActorUtils getActorUtils() {
-        return actorUtils;
-    }
-
-    @Override
-    void closeTransaction() {
-        LOG.debug("Tx {} closeTransaction called", getIdentifier());
-        TransactionContextCleanup.untrack(this);
-
-        actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
-    }
-
-    @Override
-    Future<Object> directCommit(final Boolean havePermit) {
-        LOG.debug("Tx {} directCommit called", getIdentifier());
-
-        // Send the remaining batched modifications, if any, with the ready flag set.
-        bumpPermits(havePermit);
-        return sendBatchedModifications(true, true, Optional.empty());
-    }
-
-    @Override
-    Future<ActorSelection> readyTransaction(final Boolean havePermit,
-            final Optional<SortedSet<String>> participatingShardNames) {
-        logModificationCount();
-
-        LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
-        // Send the remaining batched modifications, if any, with the ready flag set.
-
-        bumpPermits(havePermit);
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
-
-        // Transform the last reply Future into a Future that returns the cohort actor path from
-        // the last reply message. That's the end result of the ready operation.
-        return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier());
-    }
-
-    private void bumpPermits(final Boolean havePermit) {
-        if (Boolean.TRUE.equals(havePermit)) {
-            ++batchPermits;
-        }
-    }
-
-    private BatchedModifications newBatchedModifications() {
-        return new BatchedModifications(getIdentifier(), getTransactionVersion());
-    }
-
-    private void batchModification(final Modification modification, final boolean havePermit) {
-        incrementModificationCount();
-        if (havePermit) {
-            ++batchPermits;
-        }
-
-        if (batchedModifications == null) {
-            batchedModifications = newBatchedModifications();
-        }
-
-        batchedModifications.addModification(modification);
-
-        if (batchedModifications.getModifications().size()
-                >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) {
-            sendBatchedModifications();
-        }
-    }
-
-    @VisibleForTesting
-    Future<Object> sendBatchedModifications() {
-        return sendBatchedModifications(false, false, Optional.empty());
-    }
-
-    private Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
-            final Optional<SortedSet<String>> participatingShardNames) {
-        Future<Object> sent = null;
-        if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
-            if (batchedModifications == null) {
-                batchedModifications = newBatchedModifications();
-            }
-
-            LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
-                    batchedModifications.getModifications().size(), ready);
-
-            batchedModifications.setDoCommitOnReady(doCommitOnReady);
-            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
-
-            final BatchedModifications toSend = batchedModifications;
-            final int permitsToRelease = batchPermits;
-            batchPermits = 0;
-
-            if (ready) {
-                batchedModifications.setReady(participatingShardNames);
-                batchedModifications.setDoCommitOnReady(doCommitOnReady);
-                batchedModifications = null;
-            } else {
-                batchedModifications = newBatchedModifications();
-
-                final Throwable failure = failedModification;
-                if (failure != null) {
-                    // We have observed a modification failure, it does not make sense to send this batch. This speeds
-                    // up the time when the application could be blocked due to messages timing out and operation
-                    // limiter kicking in.
-                    LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
-                    limiter.release(permitsToRelease);
-                    return Futures.failed(failure);
-                }
-            }
-
-            sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(),
-                actorUtils.getTransactionCommitOperationTimeout());
-            sent.onComplete(new OnComplete<>() {
-                @Override
-                public void onComplete(final Throwable failure, final Object success) {
-                    if (failure != null) {
-                        LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
-                        failedModification = failure;
-                    } else {
-                        LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
-                    }
-                    limiter.release(permitsToRelease);
-                }
-            }, actorUtils.getClientDispatcher());
-        }
-
-        return sent;
-    }
-
-    @Override
-    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
-        LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
-        executeModification(new DeleteModification(path), havePermit);
-    }
-
-    @Override
-    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
-        executeModification(new MergeModification(path, data), havePermit);
-    }
-
-    @Override
-    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
-        LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
-        executeModification(new WriteModification(path, data), havePermit);
-    }
-
-    private void executeModification(final AbstractModification modification, final Boolean havePermit) {
-        final boolean permitToRelease;
-        if (havePermit == null) {
-            permitToRelease = failedModification == null && acquireOperation();
-        } else {
-            permitToRelease = havePermit;
-        }
-
-        batchModification(modification, permitToRelease);
-    }
-
-    @Override
-    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
-            final Boolean havePermit) {
-        LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
-                readCmd.getPath());
-
-        final Throwable failure = failedModification;
-        if (failure != null) {
-            // If we know there was a previous modification failure, we must not send a read request, as it risks
-            // returning incorrect data. We check this before acquiring an operation simply because we want the app
-            // to complete this transaction as soon as possible.
-            returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
-                    + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
-            return;
-        }
-
-        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
-        // public API contract.
-
-        final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit;
-        sendBatchedModifications();
-
-        OnComplete<Object> onComplete = new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object response) {
-                // We have previously acquired an operation, now release it, no matter what happened
-                if (permitToRelease) {
-                    limiter.release();
-                }
-
-                if (failure != null) {
-                    LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(),
-                        failure);
-
-                    returnFuture.setException(new ReadFailedException("Error checking "
-                        + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
-                } else {
-                    LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
-                    readCmd.processResponse(response, returnFuture);
-                }
-            }
-        };
-
-        final Future<Object> future = actorUtils.executeOperationAsync(getActor(),
-            readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout());
-        future.onComplete(onComplete, actorUtils.getClientDispatcher());
-    }
-
-    /**
-     * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
-     * does nothing.
-     *
-     * @return True if a permit was successfully acquired, false otherwise
-     */
-    private boolean acquireOperation() {
-        checkState(isOperationHandOffComplete(),
-            "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
-            getIdentifier(), actor);
-
-        if (limiter.acquire()) {
-            return true;
-        }
-
-        LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
-        return false;
-    }
-
-    @Override
-    boolean usesOperationLimiting() {
-        return true;
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
deleted file mode 100644 (file)
index b32ef45..0000000
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
-import akka.util.Timeout;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Handles creation of TransactionContext instances for remote transactions. This class creates
- * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
- * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
- * <p/>
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
- * CreateTransaction completes, successfully or not.
- */
-final class RemoteTransactionContextSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
-
-    private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000;
-    private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000;
-
-    private final TransactionProxy parent;
-    private final String shardName;
-
-    /**
-     * The target primary shard.
-     */
-    private volatile PrimaryShardInfo primaryShardInfo;
-
-    /**
-     * The total timeout for creating a tx on the primary shard.
-     */
-    private volatile long totalCreateTxTimeout;
-
-    private final Timeout createTxMessageTimeout;
-
-    private final DelayedTransactionContextWrapper transactionContextWrapper;
-
-    RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
-            final TransactionProxy parent, final String shardName) {
-        this.parent = requireNonNull(parent);
-        this.shardName = shardName;
-        this.transactionContextWrapper = transactionContextWrapper;
-
-        // For the total create tx timeout, use 2 times the election timeout. This should be enough time for
-        // a leader re-election to occur if we happen to hit it in transition.
-        totalCreateTxTimeout = parent.getActorUtils().getDatastoreContext().getShardRaftConfig()
-                .getElectionTimeOutInterval().toMillis() * 2;
-
-        // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
-        // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
-        // larger than the totalCreateTxTimeout in production which we don't want.
-        long operationTimeout = parent.getActorUtils().getOperationTimeout().duration().toMillis();
-        createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
-                TimeUnit.MILLISECONDS);
-    }
-
-    String getShardName() {
-        return shardName;
-    }
-
-    private TransactionType getTransactionType() {
-        return parent.getType();
-    }
-
-    private ActorUtils getActorUtils() {
-        return parent.getActorUtils();
-    }
-
-    private TransactionIdentifier getIdentifier() {
-        return parent.getIdentifier();
-    }
-
-    /**
-     * Sets the target primary shard and initiates a CreateTransaction try.
-     */
-    void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) {
-        primaryShardInfo = newPrimaryShardInfo;
-
-        if (getTransactionType() == TransactionType.WRITE_ONLY
-                && getActorUtils().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-            ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor();
-
-            LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
-                getIdentifier(), primaryShard);
-
-            // For write-only Tx's we prepare the transaction modifications directly on the shard actor
-            // to avoid the overhead of creating a separate transaction actor.
-            transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
-                    primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion()));
-        } else {
-            tryCreateTransaction();
-        }
-    }
-
-    /**
-      Performs a CreateTransaction try async.
-     */
-    private void tryCreateTransaction() {
-        LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
-                primaryShardInfo.getPrimaryShardActor());
-
-        Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
-                    primaryShardInfo.getPrimaryShardVersion()).toSerializable();
-
-        Future<Object> createTxFuture = getActorUtils().executeOperationAsync(
-                primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout);
-
-        createTxFuture.onComplete(new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object response) {
-                onCreateTransactionComplete(failure, response);
-            }
-        }, getActorUtils().getClientDispatcher());
-    }
-
-    private void tryFindPrimaryShard() {
-        LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
-
-        primaryShardInfo = null;
-        Future<PrimaryShardInfo> findPrimaryFuture = getActorUtils().findPrimaryShardAsync(shardName);
-        findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
-            @Override
-            public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
-                onFindPrimaryShardComplete(failure, newPrimaryShardInfo);
-            }
-        }, getActorUtils().getClientDispatcher());
-    }
-
-    private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
-        if (failure == null) {
-            primaryShardInfo = newPrimaryShardInfo;
-            tryCreateTransaction();
-        } else {
-            LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
-
-            onCreateTransactionComplete(failure, null);
-        }
-    }
-
-    private void onCreateTransactionComplete(final Throwable failure, final Object response) {
-        // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
-        // the cached remote leader actor is no longer available.
-        boolean retryCreateTransaction = primaryShardInfo != null
-                && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
-
-        // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
-        // be written by different threads however not concurrently, therefore decrementing it
-        // non-atomically here is ok.
-        if (retryCreateTransaction && totalCreateTxTimeout > 0) {
-            long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
-            if (failure instanceof AskTimeoutException) {
-                // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
-                // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
-                // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
-                totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
-                scheduleInterval = 10;
-            }
-
-            totalCreateTxTimeout -= scheduleInterval;
-
-            LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
-                    getIdentifier(), shardName, failure, scheduleInterval);
-
-            getActorUtils().getActorSystem().scheduler().scheduleOnce(
-                    FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
-                    this::tryFindPrimaryShard, getActorUtils().getClientDispatcher());
-            return;
-        }
-
-        createTransactionContext(failure, response);
-    }
-
-    private void createTransactionContext(final Throwable failure, final Object response) {
-        // Create the TransactionContext from the response or failure. Store the new
-        // TransactionContext locally until we've completed invoking the
-        // TransactionOperations. This avoids thread timing issues which could cause
-        // out-of-order TransactionOperations. Eg, on a modification operation, if the
-        // TransactionContext is non-null, then we directly call the TransactionContext.
-        // However, at the same time, the code may be executing the cached
-        // TransactionOperations. So to avoid thus timing, we don't publish the
-        // TransactionContext until after we've executed all cached TransactionOperations.
-        TransactionContext localTransactionContext;
-        if (failure != null) {
-            LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
-
-            Throwable resultingEx = failure;
-            if (failure instanceof AskTimeoutException) {
-                resultingEx = new ShardLeaderNotRespondingException(String.format(
-                        "Could not create a %s transaction on shard %s. The shard leader isn't responding.",
-                        parent.getType(), shardName), failure);
-            } else if (!(failure instanceof NoShardLeaderException)) {
-                resultingEx = new Exception(String.format(
-                    "Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
-            }
-
-            localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
-        } else if (CreateTransactionReply.isSerializedType(response)) {
-            localTransactionContext = createValidTransactionContext(
-                    CreateTransactionReply.fromSerializable(response));
-        } else {
-            IllegalArgumentException exception = new IllegalArgumentException(String.format(
-                    "Invalid reply type %s for CreateTransaction", response.getClass()));
-
-            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
-        }
-        transactionContextWrapper.executePriorTransactionOperations(localTransactionContext);
-    }
-
-    private TransactionContext createValidTransactionContext(final CreateTransactionReply reply) {
-        LOG.debug("Tx {} Received {}", getIdentifier(), reply);
-
-        return createValidTransactionContext(getActorUtils().actorSelection(reply.getTransactionPath()),
-                reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion());
-    }
-
-    private TransactionContext createValidTransactionContext(final ActorSelection transactionActor,
-            final String transactionPath, final short remoteTransactionVersion) {
-        final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
-                transactionActor, getActorUtils(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
-
-        if (parent.getType() == TransactionType.READ_ONLY) {
-            TransactionContextCleanup.track(parent, ret);
-        }
-
-        return ret;
-    }
-}
-
index ceff3e31f641a84c00b209f97433ab3f14eb1651..e6c40d0809eface25b01bce3b3bbae686879f49a 100644 (file)
@@ -109,7 +109,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java
deleted file mode 100644 (file)
index c099c45..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
- * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization.
- *
- * @author Thomas Pantelis
- */
-class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
-    private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
-
-    private final ActorUtils actorUtils;
-    private final Future<Object> cohortFuture;
-    private final TransactionIdentifier transactionId;
-    private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
-    private final OperationCallback.Reference operationCallbackRef;
-
-    SingleCommitCohortProxy(final ActorUtils actorUtils, final Future<Object> cohortFuture,
-            final TransactionIdentifier transactionId, final OperationCallback.Reference operationCallbackRef) {
-        this.actorUtils = actorUtils;
-        this.cohortFuture = cohortFuture;
-        this.transactionId = requireNonNull(transactionId);
-        this.operationCallbackRef = operationCallbackRef;
-    }
-
-    @Override
-    public ListenableFuture<Boolean> canCommit() {
-        LOG.debug("Tx {} canCommit", transactionId);
-
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
-        cohortFuture.onComplete(new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object cohortResponse) {
-                if (failure != null) {
-                    operationCallbackRef.get().failure();
-                    returnFuture.setException(failure);
-                    return;
-                }
-
-                operationCallbackRef.get().success();
-
-                LOG.debug("Tx {} successfully completed direct commit", transactionId);
-
-                // The Future was the result of a direct commit to the shard, essentially eliding the
-                // front-end 3PC coordination. We don't really care about the specific Future
-                // response object, only that it completed successfully. At this point the Tx is complete
-                // so return true. The subsequent preCommit and commit phases will be no-ops, ie return
-                // immediate success, to complete the 3PC for the front-end.
-                returnFuture.set(Boolean.TRUE);
-            }
-        }, actorUtils.getClientDispatcher());
-
-        return returnFuture;
-    }
-
-    @Override
-    public ListenableFuture<Empty> preCommit() {
-        return delegateCohort.preCommit();
-    }
-
-    @Override
-    public ListenableFuture<Empty> abort() {
-        return delegateCohort.abort();
-    }
-
-    @Override
-    public ListenableFuture<? extends CommitInfo> commit() {
-        return delegateCohort.commit();
-    }
-
-    @Override
-    List<Future<Object>> getCohortFutures() {
-        return List.of(cohortFuture);
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
deleted file mode 100644 (file)
index 0eded31..0000000
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open
- * at a time. For remote transactions, it also tracks the outstanding readiness requests
- * towards the shard and unblocks operations only after all have completed.
- */
-final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain>
-        implements DOMStoreTransactionChain {
-    private abstract static class State {
-        /**
-         * Check if it is okay to allocate a new transaction.
-         * @throws IllegalStateException if a transaction may not be allocated.
-         */
-        abstract void checkReady();
-
-        /**
-         * Return the future which needs to be waited for before shard information
-         * is returned (which unblocks remote transactions).
-         * @return Future to wait for, or null of no wait is necessary
-         */
-        abstract Future<?> previousFuture();
-    }
-
-    private abstract static class Pending extends State {
-        private final TransactionIdentifier transaction;
-        private final Future<?> previousFuture;
-
-        Pending(final TransactionIdentifier transaction, final Future<?> previousFuture) {
-            this.previousFuture = previousFuture;
-            this.transaction = requireNonNull(transaction);
-        }
-
-        @Override
-        final Future<?> previousFuture() {
-            return previousFuture;
-        }
-
-        final TransactionIdentifier getIdentifier() {
-            return transaction;
-        }
-    }
-
-    private static final class Allocated extends Pending {
-        Allocated(final TransactionIdentifier transaction, final Future<?> previousFuture) {
-            super(transaction, previousFuture);
-        }
-
-        @Override
-        void checkReady() {
-            throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier()));
-        }
-    }
-
-    private static final class Submitted extends Pending {
-        Submitted(final TransactionIdentifier transaction, final Future<?> previousFuture) {
-            super(transaction, previousFuture);
-        }
-
-        @Override
-        void checkReady() {
-            // Okay to allocate
-        }
-    }
-
-    private abstract static class DefaultState extends State {
-        @Override
-        final Future<?> previousFuture() {
-            return null;
-        }
-    }
-
-    private static final State IDLE_STATE = new DefaultState() {
-        @Override
-        void checkReady() {
-            // Okay to allocate
-        }
-    };
-
-    private static final State CLOSED_STATE = new DefaultState() {
-        @Override
-        void checkReady() {
-            throw new DOMTransactionChainClosedException("Transaction chain has been closed");
-        }
-    };
-
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
-    private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
-
-    private final TransactionContextFactory parent;
-    private volatile State currentState = IDLE_STATE;
-
-    /**
-     * This map holds Promise instances for each read-only tx. It is used to maintain ordering of tx creates
-     * wrt to read-only tx's between this class and a LocalTransactionChain since they're bridged by
-     * asynchronous futures. Otherwise, in the following scenario, eg:
-     * <p/>
-     *   1) Create write tx1 on chain
-     *   2) do write and submit
-     *   3) Create read-only tx2 on chain and issue read
-     *   4) Create write tx3 on chain, do write but do not submit
-     * <p/>
-     * if the sequence/timing is right, tx3 may create its local tx on the LocalTransactionChain before tx2,
-     * which results in tx2 failing b/c tx3 isn't ready yet. So maintaining ordering prevents this issue
-     * (see Bug 4774).
-     * <p/>
-     * A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard
-     * lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is
-     * called which completes the Promise. A write tx that is created prior to completion will wait on the
-     * Promise's Future via findPrimaryShard.
-     */
-    private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises =
-            new ConcurrentHashMap<>();
-
-    TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
-        super(parent.getActorUtils(), historyId);
-        this.parent = parent;
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        currentState.checkReady();
-        TransactionProxy transactionProxy = new TransactionProxy(this, TransactionType.READ_ONLY);
-        priorReadOnlyTxPromises.put(transactionProxy.getIdentifier(), Futures.<Object>promise());
-        return transactionProxy;
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        getActorUtils().acquireTxCreationPermit();
-        return allocateWriteTransaction(TransactionType.READ_WRITE);
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        getActorUtils().acquireTxCreationPermit();
-        return allocateWriteTransaction(TransactionType.WRITE_ONLY);
-    }
-
-    @Override
-    public void close() {
-        currentState = CLOSED_STATE;
-
-        // Send a close transaction chain request to each and every shard
-
-        getActorUtils().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(),
-                CloseTransactionChain.class);
-    }
-
-    private TransactionProxy allocateWriteTransaction(final TransactionType type) {
-        State localState = currentState;
-        localState.checkReady();
-
-        final TransactionProxy ret = new TransactionProxy(this, type);
-        currentState = new Allocated(ret.getIdentifier(), localState.previousFuture());
-        return ret;
-    }
-
-    @Override
-    protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader,
-            final ReadOnlyDataTree dataTree) {
-        final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree);
-        LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader);
-        return ret;
-    }
-
-    /**
-     * This method is overridden to ensure the previous Tx's ready operations complete
-     * before we initiate the next Tx in the chain to avoid creation failures if the
-     * previous Tx's ready operations haven't completed yet.
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final TransactionIdentifier txId) {
-        // Read current state atomically
-        final State localState = currentState;
-
-        // There are no outstanding futures, shortcut
-        Future<?> previous = localState.previousFuture();
-        if (previous == null) {
-            return combineFutureWithPossiblePriorReadOnlyTxFutures(parent.findPrimaryShard(shardName, txId), txId);
-        }
-
-        final String previousTransactionId;
-
-        if (localState instanceof Pending) {
-            previousTransactionId = ((Pending) localState).getIdentifier().toString();
-            LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
-        } else {
-            previousTransactionId = "";
-            LOG.debug("Waiting for ready futures on chain {}", getHistoryId());
-        }
-
-        previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId);
-
-        // Add a callback for completion of the combined Futures.
-        final Promise<PrimaryShardInfo> returnPromise = Futures.promise();
-
-        final OnComplete onComplete = new OnComplete() {
-            @Override
-            public void onComplete(final Throwable failure, final Object notUsed) {
-                if (failure != null) {
-                    // A Ready Future failed so fail the returned Promise.
-                    LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
-                    returnPromise.failure(failure);
-                } else {
-                    LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
-                            txId, previousTransactionId);
-
-                    // Send the FindPrimaryShard message and use the resulting Future to complete the
-                    // returned Promise.
-                    returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
-                }
-            }
-        };
-
-        previous.onComplete(onComplete, getActorUtils().getClientDispatcher());
-        return returnPromise.future();
-    }
-
-    private <T> Future<T> combineFutureWithPossiblePriorReadOnlyTxFutures(final Future<T> future,
-            final TransactionIdentifier txId) {
-        return priorReadOnlyTxPromises.isEmpty() || priorReadOnlyTxPromises.containsKey(txId) ? future
-                // Tough luck, we need do some work
-                : combineWithPriorReadOnlyTxFutures(future, txId);
-    }
-
-    // Split out of the common path
-    private <T> Future<T> combineWithPriorReadOnlyTxFutures(final Future<T> future, final TransactionIdentifier txId) {
-        // Take a stable snapshot, and check if we raced
-        final List<Entry<TransactionIdentifier, Promise<Object>>> priorReadOnlyTxPromiseEntries =
-                new ArrayList<>(priorReadOnlyTxPromises.entrySet());
-        if (priorReadOnlyTxPromiseEntries.isEmpty()) {
-            return future;
-        }
-
-        final List<Future<Object>> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size());
-        for (Entry<TransactionIdentifier, Promise<Object>> entry: priorReadOnlyTxPromiseEntries) {
-            LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey());
-            priorReadOnlyTxFutures.add(entry.getValue().future());
-        }
-
-        final Future<Iterable<Object>> combinedFutures = Futures.sequence(priorReadOnlyTxFutures,
-            getActorUtils().getClientDispatcher());
-
-        final Promise<T> returnPromise = Futures.promise();
-        final OnComplete<Iterable<Object>> onComplete = new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Iterable<Object> notUsed) {
-                LOG.debug("Tx: {} - prior read-only Tx futures complete", txId);
-
-                // Complete the returned Promise with the original Future.
-                returnPromise.completeWith(future);
-            }
-        };
-
-        combinedFutures.onComplete(onComplete, getActorUtils().getClientDispatcher());
-        return returnPromise.future();
-    }
-
-    @Override
-    protected <T> void onTransactionReady(final TransactionIdentifier transaction,
-            final Collection<Future<T>> cohortFutures) {
-        final State localState = currentState;
-        checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction,
-            localState);
-        final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier();
-        checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction,
-            currentTx);
-
-        // Transaction ready and we are not waiting for futures -- go to idle
-        if (cohortFutures.isEmpty()) {
-            currentState = IDLE_STATE;
-            return;
-        }
-
-        // Combine the ready Futures into 1
-        final Future<Iterable<T>> combined = Futures.sequence(cohortFutures, getActorUtils().getClientDispatcher());
-
-        // Record the we have outstanding futures
-        final State newState = new Submitted(transaction, combined);
-        currentState = newState;
-
-        // Attach a completion reset, but only if we do not allocate a transaction
-        // in-between
-        combined.onComplete(new OnComplete<Iterable<T>>() {
-            @Override
-            public void onComplete(final Throwable arg0, final Iterable<T> arg1) {
-                STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE);
-            }
-        }, getActorUtils().getClientDispatcher());
-    }
-
-    @Override
-    protected void onTransactionContextCreated(final TransactionIdentifier transactionId) {
-        Promise<Object> promise = priorReadOnlyTxPromises.remove(transactionId);
-        if (promise != null) {
-            promise.success(null);
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
deleted file mode 100644 (file)
index 549136b..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.yangtools.concepts.AbstractSimpleIdentifiable;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-abstract class TransactionContext extends AbstractSimpleIdentifiable<TransactionIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
-
-    private final short transactionVersion;
-
-    private long modificationCount = 0;
-    private boolean handOffComplete;
-
-    TransactionContext(final TransactionIdentifier transactionIdentifier) {
-        this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    TransactionContext(final TransactionIdentifier transactionIdentifier, final short transactionVersion) {
-        super(transactionIdentifier);
-        this.transactionVersion = transactionVersion;
-    }
-
-    final short getTransactionVersion() {
-        return transactionVersion;
-    }
-
-    final void incrementModificationCount() {
-        modificationCount++;
-    }
-
-    final void logModificationCount() {
-        LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
-    }
-
-    /**
-     * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
-     * off operations to this context. From this point on, the context is responsible
-     * for throttling operations.
-     *
-     * <p>
-     * Implementations can rely on the wrapper calling this operation in a synchronized
-     * block, so they do not need to ensure visibility of this state transition themselves.
-     */
-    final void operationHandOffComplete() {
-        handOffComplete = true;
-    }
-
-    final boolean isOperationHandOffComplete() {
-        return handOffComplete;
-    }
-
-    /**
-     * A TransactionContext that uses operation limiting should return true else false.
-     *
-     * @return true if operation limiting is used, false otherwise
-     */
-    boolean usesOperationLimiting() {
-        return false;
-    }
-
-    abstract void executeDelete(YangInstanceIdentifier path, Boolean havePermit);
-
-    abstract void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
-
-    abstract void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
-
-    abstract <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture, Boolean havePermit);
-
-    abstract Future<ActorSelection> readyTransaction(Boolean havePermit,
-            Optional<SortedSet<String>> participatingShardNames);
-
-    abstract Future<Object> directCommit(Boolean havePermit);
-
-    abstract void closeTransaction();
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java
deleted file mode 100644 (file)
index ef8cc49..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A PhantomReference that closes remote transactions for a TransactionContext when it's
- * garbage collected. This is used for read-only transactions as they're not explicitly closed
- * by clients. So the only way to detect that a transaction is no longer in use and it's safe
- * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
- * but TransactionProxy instances should generally be short-lived enough to avoid being moved
- * to the old generation space and thus should be cleaned up in a timely manner as the GC
- * runs on the young generation (eden, swap1...) space much more frequently.
- */
-final class TransactionContextCleanup extends FinalizablePhantomReference<TransactionProxy> {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextCleanup.class);
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionContext, TransactionContextCleanup> CACHE = new ConcurrentHashMap<>();
-
-    private final TransactionContext cleanup;
-
-    private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
-        super(referent, QUEUE);
-        this.cleanup = cleanup;
-    }
-
-    static void track(final TransactionProxy referent, final TransactionContext cleanup) {
-        final TransactionContextCleanup ret = new TransactionContextCleanup(referent, cleanup);
-        CACHE.put(cleanup, ret);
-    }
-
-    @Override
-    public void finalizeReferent() {
-        LOG.trace("Cleaning up {} Tx actors", cleanup);
-
-        if (CACHE.remove(cleanup) != null) {
-            cleanup.closeTransaction();
-        }
-    }
-
-    static void untrack(final TransactionContext cleanup) {
-        CACHE.remove(cleanup);
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java
deleted file mode 100644 (file)
index 3477024..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import scala.concurrent.Future;
-
-/**
- * An {@link AbstractTransactionContextFactory} which produces TransactionContext instances for single
- * transactions (ie not chained).
- */
-final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
-    private final AtomicLong nextHistory = new AtomicLong(1);
-
-    TransactionContextFactory(final ActorUtils actorUtils, final ClientIdentifier clientId) {
-        super(actorUtils, new LocalHistoryIdentifier(clientId, 0));
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader,
-            final ReadOnlyDataTree dataTree) {
-        return new LocalTransactionFactoryImpl(getActorUtils(), shardLeader, dataTree);
-    }
-
-    @Override
-    protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final TransactionIdentifier txId) {
-        return getActorUtils().findPrimaryShardAsync(shardName);
-    }
-
-    @Override
-    protected <T> void onTransactionReady(final TransactionIdentifier transaction,
-            final Collection<Future<T>> cohortFutures) {
-        // Transactions are disconnected, this is a no-op
-    }
-
-    DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(this, new LocalHistoryIdentifier(getHistoryId().getClientId(),
-                nextHistory.getAndIncrement()));
-    }
-
-    @Override
-    protected void onTransactionContextCreated(final TransactionIdentifier transactionId) {
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionModificationOperation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionModificationOperation.java
deleted file mode 100644 (file)
index eeaec6b..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * A TransactionOperation to apply a specific modification. Subclasses provide type capture of required data, so that
- * we instantiate AbstractModification subclasses for the bare minimum time required.
- */
-abstract class TransactionModificationOperation extends TransactionOperation {
-    private abstract static class AbstractDataOperation extends TransactionModificationOperation {
-        private final NormalizedNode data;
-
-        AbstractDataOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
-            super(path);
-            this.data = requireNonNull(data);
-        }
-
-        final NormalizedNode data() {
-            return data;
-        }
-    }
-
-    static final class DeleteOperation extends TransactionModificationOperation {
-        DeleteOperation(final YangInstanceIdentifier path) {
-            super(path);
-        }
-
-        @Override
-        protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-            transactionContext.executeDelete(path(), havePermit);
-        }
-    }
-
-    static final class MergeOperation extends AbstractDataOperation {
-        MergeOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
-            super(path, data);
-        }
-
-        @Override
-        protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-            transactionContext.executeMerge(path(), data(), havePermit);
-        }
-    }
-
-    static final class WriteOperation extends AbstractDataOperation {
-        WriteOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
-            super(path, data);
-        }
-
-        @Override
-        protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-            transactionContext.executeWrite(path(), data(), havePermit);
-        }
-    }
-
-    private final YangInstanceIdentifier path;
-
-    TransactionModificationOperation(final YangInstanceIdentifier path) {
-        this.path = requireNonNull(path);
-    }
-
-    final YangInstanceIdentifier path() {
-        return path;
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java
deleted file mode 100644 (file)
index 962d261..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.eclipse.jdt.annotation.Nullable;
-
-/**
- * Abstract superclass for transaction operations which should be executed
- * on a {@link TransactionContext} at a later point in time.
- */
-abstract class TransactionOperation {
-    /**
-     * Execute the delayed operation.
-     *
-     * @param transactionContext the TransactionContext
-     * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
-     *                   attempt to acquire a permit.
-     */
-    protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
deleted file mode 100644 (file)
index d9fc2df..0000000
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
-import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * A transaction potentially spanning multiple backend shards.
- */
-public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier>
-        implements DOMStoreReadWriteTransaction {
-    private enum TransactionState {
-        OPEN,
-        READY,
-        CLOSED,
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
-    private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.of());
-
-    private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
-    private final AbstractTransactionContextFactory<?> txContextFactory;
-    private final TransactionType type;
-    private TransactionState state = TransactionState.OPEN;
-
-    @VisibleForTesting
-    public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
-        super(txContextFactory.nextIdentifier(), txContextFactory.getActorUtils().getDatastoreContext()
-                .isTransactionDebugContextEnabled());
-        this.txContextFactory = txContextFactory;
-        this.type = requireNonNull(type);
-
-        LOG.debug("New {} Tx - {}", type, getIdentifier());
-    }
-
-    @Override
-    public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
-        return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    private <T> FluentFuture<T> executeRead(final String shardName, final AbstractRead<T> readCmd) {
-        checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
-
-        LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
-
-        final SettableFuture<T> proxyFuture = SettableFuture.create();
-        AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
-        contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-                transactionContext.executeRead(readCmd, proxyFuture, havePermit);
-            }
-        });
-
-        return FluentFuture.from(proxyFuture);
-    }
-
-    @Override
-    public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
-        checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
-        requireNonNull(path, "path should not be null");
-
-        LOG.trace("Tx {} read {}", getIdentifier(), path);
-        return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
-    }
-
-    private FluentFuture<Optional<NormalizedNode>> singleShardRead(final String shardName,
-            final YangInstanceIdentifier path) {
-        return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    private FluentFuture<Optional<NormalizedNode>> readAllData() {
-        final var actorUtils = getActorUtils();
-        return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
-            .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.of())));
-    }
-
-    @Override
-    public void delete(final YangInstanceIdentifier path) {
-        checkModificationState("delete", path);
-
-        if (path.isEmpty()) {
-            deleteAllData();
-        } else {
-            executeModification(new DeleteOperation(path));
-        }
-    }
-
-    private void deleteAllData() {
-        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
-            wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
-        }
-    }
-
-    @Override
-    public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
-        checkModificationState("merge", path);
-
-        if (path.isEmpty()) {
-            mergeAllData(RootScatterGather.castRootNode(data));
-        } else {
-            executeModification(new MergeOperation(path, data));
-        }
-    }
-
-    private void mergeAllData(final ContainerNode rootData) {
-        if (!rootData.isEmpty()) {
-            RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
-                scattered -> scattered.shard().maybeExecuteTransactionOperation(
-                    new MergeOperation(YangInstanceIdentifier.of(), scattered.container())));
-        }
-    }
-
-    @Override
-    public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
-        checkModificationState("write", path);
-
-        if (path.isEmpty()) {
-            writeAllData(RootScatterGather.castRootNode(data));
-        } else {
-            executeModification(new WriteOperation(path, data));
-        }
-    }
-
-    private void writeAllData(final ContainerNode rootData) {
-        RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
-            getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
-                scattered -> scattered.shard().maybeExecuteTransactionOperation(
-                    new WriteOperation(YangInstanceIdentifier.of(), scattered.container())));
-    }
-
-    private void executeModification(final TransactionModificationOperation operation) {
-        wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
-    }
-
-    private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
-        checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
-        checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
-        LOG.trace("Tx {} {} {}", getIdentifier(), opName, path);
-    }
-
-    private boolean seal(final TransactionState newState) {
-        if (state == TransactionState.OPEN) {
-            state = newState;
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public final void close() {
-        if (!seal(TransactionState.CLOSED)) {
-            checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
-                getIdentifier());
-            // Idempotent no-op as per AutoCloseable recommendation
-            return;
-        }
-
-        for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
-            contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-                @Override
-                public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-                    transactionContext.closeTransaction();
-                }
-            });
-        }
-
-
-        txContextWrappers.clear();
-    }
-
-    @Override
-    public final AbstractThreePhaseCommitCohort<?> ready() {
-        checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
-
-        final boolean success = seal(TransactionState.READY);
-        checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
-
-        LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size());
-
-        final AbstractThreePhaseCommitCohort<?> ret = switch (txContextWrappers.size()) {
-            case 0 -> NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
-            case 1 -> {
-                final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
-                        txContextWrappers.entrySet());
-                yield createSingleCommitCohort(e.getKey(), e.getValue());
-            }
-            default -> createMultiCommitCohort();
-        };
-        txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
-
-        final Throwable debugContext = getDebugContext();
-        return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
-            final AbstractTransactionContextWrapper contextWrapper) {
-
-        LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
-
-        final OperationCallback.Reference operationCallbackRef =
-                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
-
-        final TransactionContext transactionContext = contextWrapper.getTransactionContext();
-        final Future future;
-        if (transactionContext == null) {
-            final Promise promise = akka.dispatch.Futures.promise();
-            contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-                @Override
-                public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                    promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
-                        havePermit));
-                }
-            });
-            future = promise.future();
-        } else {
-            // avoid the creation of a promise and a TransactionOperation
-            future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
-        }
-
-        return new SingleCommitCohortProxy(txContextFactory.getActorUtils(), future, getIdentifier(),
-            operationCallbackRef);
-    }
-
-    private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
-            final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
-        TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
-                txContextFactory.getActorUtils());
-        operationCallbackRef.set(rateLimitingCallback);
-        rateLimitingCallback.run();
-        return transactionContext.directCommit(havePermit);
-    }
-
-    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
-
-        final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
-        final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
-        for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
-            LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
-
-            final AbstractTransactionContextWrapper wrapper = e.getValue();
-
-            // The remote tx version is obtained the via TransactionContext which may not be available yet so
-            // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
-            // TransactionContext is available.
-            cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
-                () -> wrapper.getTransactionContext().getTransactionVersion()));
-        }
-
-        return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
-    }
-
-    private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
-        return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
-    }
-
-    private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
-        return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.of(childId)));
-    }
-
-    private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
-        final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
-        if (existing != null) {
-            return existing;
-        }
-
-        final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
-        txContextWrappers.put(shardName, fresh);
-        return fresh;
-    }
-
-    TransactionType getType() {
-        return type;
-    }
-
-    boolean isReady() {
-        return state != TransactionState.OPEN;
-    }
-
-    final ActorUtils getActorUtils() {
-        return txContextFactory.getActorUtils();
-    }
-}
index eb8173ec8aca9802c1730b0147179dea49c7bb18..b8c570de7929e6af9a32b36648975407afe4635a 100644 (file)
@@ -1,13 +1,16 @@
 // vi: set smarttab et sw=4 tabstop=4:
 module distributed-datastore-provider {
-
     yang-version 1;
     namespace "urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider";
     prefix "distributed-datastore-provider";
 
     description
         "This module contains the base YANG definitions for
-        the distributed datastore provider implementation";
+         the distributed datastore provider implementation";
+
+    revision "2023-12-29" {
+        description "Remote use-tell-based-protocol leaf";
+    }
 
     revision "2014-06-12" {
         description
@@ -224,14 +227,6 @@ module distributed-datastore-provider {
                          maximum size in bytes for a message slice.";
         }
 
-        leaf use-tell-based-protocol {
-            status obsolete;
-            default false;
-            type boolean;
-            description "Use a newer protocol between the frontend and backend. This feature is considered
-                         exprerimental at this point.";
-        }
-
         leaf file-backed-streaming-threshold-in-megabytes {
             default 128;
             type non-zero-uint32-type;
index 3d00a2592b0a842b194f8e43a27a83b5d9de4bc3..50f51be9a761205946183df8b71377cb73f55691 100644 (file)
@@ -47,7 +47,6 @@ import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMe
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
@@ -238,13 +237,13 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
     @Test
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+        try (var dataStore = testKit.setupAbstractDataStore(testParameter,
+                "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
 
-            final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+            final var txChain = dataStore.createTransactionChain();
 
-            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            var writeTx = txChain.newWriteOnlyTransaction();
             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
@@ -267,16 +266,16 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 .untilAsserted(() -> {
                     // verify frontend metadata has no holes in purged transactions causing overtime memory leak
                     final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
-                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
-                        (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                    final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
                             .executeOperation(localShard, new RequestFrontendMetadata());
 
                     final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellMetadata(clientMeta);
-                    } else {
-                        assertAskMetadata(clientMeta);
+                    final var iterator = clientMeta.getCurrentHistories().iterator();
+                    var metadata = iterator.next();
+                    while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                        metadata = iterator.next();
                     }
+                    assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
                 });
 
             final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
@@ -288,20 +287,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         }
     }
 
-    private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
-        // ask based should track no metadata
-        assertEquals(List.of(), clientMeta.getCurrentHistories());
-    }
-
-    private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
-        final var iterator = clientMeta.getCurrentHistories().iterator();
-        var metadata = iterator.next();
-        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
-            metadata = iterator.next();
-        }
-        assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
-    }
-
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
             throws Exception {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
deleted file mode 100644 (file)
index d3ae761..0000000
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.testkit.javadsl.TestKit;
-import akka.util.Timeout;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.FluentFuture;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Abstract base class for TransactionProxy unit tests.
- *
- * @author Thomas Pantelis
- */
-public abstract class AbstractTransactionProxyTest extends AbstractTest {
-    protected final Logger log = LoggerFactory.getLogger(getClass());
-
-    private static ActorSystem system;
-    private static SchemaContext SCHEMA_CONTEXT;
-
-    private final Configuration configuration = new MockConfiguration() {
-        Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
-                TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
-                    @Override
-                    public String findShard(final YangInstanceIdentifier path) {
-                        return TestModel.JUNK_QNAME.getLocalName();
-                    }
-                }).put(
-                CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
-                    @Override
-                    public String findShard(final YangInstanceIdentifier path) {
-                        return CarsModel.BASE_QNAME.getLocalName();
-                    }
-                }).build();
-
-        @Override
-        public ShardStrategy getStrategyForModule(final String moduleName) {
-            return strategyMap.get(moduleName);
-        }
-
-        @Override
-        public String getModuleNameFromNameSpace(final String nameSpace) {
-            if (TestModel.JUNK_QNAME.getNamespace().toString().equals(nameSpace)) {
-                return TestModel.JUNK_QNAME.getLocalName();
-            } else if (CarsModel.BASE_QNAME.getNamespace().toString().equals(nameSpace)) {
-                return CarsModel.BASE_QNAME.getLocalName();
-            }
-            return null;
-        }
-    };
-
-    @Mock
-    protected ActorUtils mockActorContext;
-
-    protected TransactionContextFactory mockComponentFactory;
-
-    @Mock
-    private ClusterWrapper mockClusterWrapper;
-
-    protected final String memberName = "mock-member";
-
-    private final int operationTimeoutInSeconds = 2;
-    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
-            .operationTimeoutInSeconds(operationTimeoutInSeconds);
-
-    @BeforeClass
-    public static void setUpClass() {
-
-        Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
-                .put("akka.actor.default-dispatcher.type",
-                        "akka.testkit.CallingThreadDispatcherConfigurator").build())
-                .withFallback(ConfigFactory.load());
-        system = ActorSystem.create("test", config);
-        SCHEMA_CONTEXT = TestModel.createTestContext();
-    }
-
-    @AfterClass
-    public static void tearDownClass() {
-        TestKit.shutdownActorSystem(system);
-        system = null;
-        SCHEMA_CONTEXT = null;
-    }
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
-        doReturn(getSystem()).when(mockActorContext).getActorSystem();
-        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
-        doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
-        doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
-        doReturn(SCHEMA_CONTEXT).when(mockActorContext).getSchemaContext();
-        doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
-        doReturn(new Timeout(5, TimeUnit.SECONDS)).when(mockActorContext).getTransactionCommitOperationTimeout();
-
-        final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
-        mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
-
-        Timer timer = new MetricRegistry().timer("test");
-        doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
-    }
-
-    protected ActorSystem getSystem() {
-        return system;
-    }
-
-    protected CreateTransaction eqCreateTransaction(final String expMemberName,
-            final TransactionType type) {
-        class CreateTransactionArgumentMatcher implements ArgumentMatcher<CreateTransaction> {
-            @Override
-            public boolean matches(final CreateTransaction argument) {
-                return argument.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
-                        .getName().equals(expMemberName) && argument.getTransactionType() == type.ordinal();
-            }
-        }
-
-        return argThat(new CreateTransactionArgumentMatcher());
-    }
-
-    protected DataExists eqDataExists() {
-        class DataExistsArgumentMatcher implements ArgumentMatcher<DataExists> {
-            @Override
-            public boolean matches(final DataExists argument) {
-                return argument.getPath().equals(TestModel.TEST_PATH);
-            }
-        }
-
-        return argThat(new DataExistsArgumentMatcher());
-    }
-
-    protected ReadData eqReadData() {
-        return eqReadData(TestModel.TEST_PATH);
-    }
-
-    protected ReadData eqReadData(final YangInstanceIdentifier path) {
-        class ReadDataArgumentMatcher implements ArgumentMatcher<ReadData> {
-            @Override
-            public boolean matches(final ReadData argument) {
-                return argument.getPath().equals(path);
-            }
-        }
-
-        return argThat(new ReadDataArgumentMatcher());
-    }
-
-    protected Future<Object> readyTxReply(final String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path));
-    }
-
-
-    protected Future<ReadDataReply> readDataReply(final NormalizedNode data) {
-        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    protected Future<DataExistsReply> dataExistsReply(final boolean exists) {
-        return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    protected Future<BatchedModificationsReply> batchedModificationsReply(final int count) {
-        return Futures.successful(new BatchedModificationsReply(count));
-    }
-
-    @SuppressWarnings("unchecked")
-    protected Future<Object> incompleteFuture() {
-        return mock(Future.class);
-    }
-
-    protected ActorSelection actorSelection(final ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
-    }
-
-    protected void expectBatchedModifications(final ActorRef actorRef, final int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-    }
-
-    protected void expectBatchedModifications(final int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
-    }
-
-    protected void expectBatchedModificationsReady(final ActorRef actorRef) {
-        expectBatchedModificationsReady(actorRef, false);
-    }
-
-    protected void expectBatchedModificationsReady(final ActorRef actorRef, final boolean doCommitOnReady) {
-        doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
-            readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-    }
-
-    protected void expectIncompleteBatchedModifications() {
-        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
-    }
-
-    protected void expectFailedBatchedModifications(final ActorRef actorRef) {
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-    }
-
-    protected void expectReadyLocalTransaction(final ActorRef actorRef, final boolean doCommitOnReady) {
-        doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
-            readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
-    }
-
-    protected CreateTransactionReply createTransactionReply(final ActorRef actorRef, final short transactionVersion) {
-        return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
-    }
-
-    protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem) {
-        return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
-    }
-
-    protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
-            final String shardName) {
-        return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
-                DataStoreVersions.CURRENT_VERSION);
-    }
-
-    protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
-            final String shardName, final short transactionVersion) {
-        ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-        log.info("Created mock shard actor {}", actorRef);
-
-        doReturn(actorSystem.actorSelection(actorRef.path()))
-                .when(mockActorContext).actorSelection(actorRef.path().toString());
-
-        doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion))
-                .when(mockActorContext).findPrimaryShardAsync(eq(shardName));
-
-        return actorRef;
-    }
-
-    protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef) {
-        return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef,
-            final short transactionVersion) {
-        return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
-                transactionVersion));
-    }
-
-    protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
-            final TransactionType type, final short transactionVersion, final String shardName) {
-        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
-                transactionVersion);
-
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
-                memberName, shardActorRef);
-    }
-
-    protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
-            final TransactionType type, final short transactionVersion, final String prefix,
-            final ActorRef shardActorRef) {
-
-        ActorRef txActorRef;
-        if (type == TransactionType.WRITE_ONLY
-                && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
-            txActorRef = shardActorRef;
-        } else {
-            txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-            log.info("Created mock shard Tx actor {}", txActorRef);
-
-            doReturn(actorSystem.actorSelection(txActorRef.path()))
-                .when(mockActorContext).actorSelection(txActorRef.path().toString());
-
-            doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext)
-                .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(prefix, type), any(Timeout.class));
-        }
-
-        return txActorRef;
-    }
-
-    protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
-            final TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
-                DefaultShardStrategy.DEFAULT_SHARD);
-    }
-
-    protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
-            final TransactionType type,
-            final String shardName) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
-                shardName);
-    }
-
-    @SuppressWarnings({"checkstyle:avoidHidingCauseException", "checkstyle:IllegalThrows"})
-    protected void propagateReadFailedExceptionCause(final FluentFuture<?> future) throws Throwable {
-        try {
-            future.get(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch (ExecutionException e) {
-            final Throwable cause = e.getCause();
-            assertTrue("Unexpected cause: " + cause.getClass(), cause instanceof ReadFailedException);
-            throw Throwables.getRootCause(cause);
-        }
-    }
-
-    protected List<BatchedModifications> captureBatchedModifications(final ActorRef actorRef) {
-        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
-                ArgumentCaptor.forClass(BatchedModifications.class);
-        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
-                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
-
-        List<BatchedModifications> batchedModifications = filterCaptured(
-                batchedModificationsCaptor, BatchedModifications.class);
-        return batchedModifications;
-    }
-
-    protected <T> List<T> filterCaptured(final ArgumentCaptor<T> captor, final Class<T> type) {
-        List<T> captured = new ArrayList<>();
-        for (T c: captor.getAllValues()) {
-            if (type.isInstance(c)) {
-                captured.add(c);
-            }
-        }
-
-        return captured;
-    }
-
-    protected void verifyOneBatchedModification(final ActorRef actorRef, final Modification expected,
-            final boolean expIsReady) {
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
-    }
-
-    protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
-            final Modification... expected) {
-        verifyBatchedModifications(message, expIsReady, false, expected);
-    }
-
-    protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
-            final boolean expIsDoCommitOnReady, final Modification... expected) {
-        assertEquals("Message type", BatchedModifications.class, message.getClass());
-        BatchedModifications batchedModifications = (BatchedModifications)message;
-        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
-        assertEquals("isReady", expIsReady, batchedModifications.isReady());
-        assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
-        for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
-            Modification actual = batchedModifications.getModifications().get(i);
-            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
-            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
-                    ((AbstractModification)actual).getPath());
-            if (actual instanceof WriteModification) {
-                assertEquals("getData", ((WriteModification)expected[i]).getData(),
-                        ((WriteModification)actual).getData());
-            }
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void verifyCohortFutures(final AbstractThreePhaseCommitCohort<?> proxy,
-            final Object... expReplies) {
-        assertEquals("getReadyOperationFutures size", expReplies.length,
-                proxy.getCohortFutures().size());
-
-        List<Object> futureResults = new ArrayList<>();
-        for (Future<?> future : proxy.getCohortFutures()) {
-            assertNotNull("Ready operation Future is null", future);
-            try {
-                futureResults.add(Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)));
-            } catch (Exception e) {
-                futureResults.add(e);
-            }
-        }
-
-        for (Object expReply : expReplies) {
-            boolean found = false;
-            Iterator<?> iter = futureResults.iterator();
-            while (iter.hasNext()) {
-                Object actual = iter.next();
-                if (CommitTransactionReply.isSerializedType(expReply)
-                        && CommitTransactionReply.isSerializedType(actual)
-                        || expReply instanceof ActorSelection && Objects.equals(expReply, actual)) {
-                    found = true;
-                } else if (expReply instanceof Class && ((Class<?>) expReply).isInstance(actual)) {
-                    found = true;
-                }
-
-                if (found) {
-                    iter.remove();
-                    break;
-                }
-            }
-
-            if (!found) {
-                fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
-            }
-        }
-    }
-}
index 924f1de1ff4730627fdb4e19717666c3f083279b..e271c98c63355f7e70ed752ef3631a3e879aeee0 100644 (file)
@@ -25,8 +25,8 @@ import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
 
 /**
  * Unit tests for DatastoreContextIntrospector.
index 50bd7909367ff9ea64c61077e776fdebcf2c0b91..611da694c3279812471fbe2c4b44207312d374aa 100644 (file)
@@ -34,7 +34,7 @@ import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEF
 import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
 
 public class DatastoreContextTest {
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DebugThreePhaseCommitCohortTest.java
deleted file mode 100644 (file)
index 16dc540..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertSame;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayList;
-import java.util.List;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.slf4j.Logger;
-import scala.concurrent.Future;
-
-/**
- * Unit tests for DebugThreePhaseCommitCohort.
- *
- * @author Thomas Pantelis
- */
-public class DebugThreePhaseCommitCohortTest {
-    private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
-        DebugThreePhaseCommitCohortTest.class, "mock");
-
-    @Test
-    public void test() {
-        AbstractThreePhaseCommitCohort<?> mockDelegate = mock(AbstractThreePhaseCommitCohort.class);
-        Exception failure = new Exception("mock failure");
-        ListenableFuture<Object> expFailedFuture = Futures.immediateFailedFuture(failure);
-        doReturn(expFailedFuture).when(mockDelegate).canCommit();
-        doReturn(expFailedFuture).when(mockDelegate).preCommit();
-        doReturn(expFailedFuture).when(mockDelegate).commit();
-
-        ListenableFuture<Object> expAbortFuture = Futures.immediateFuture(null);
-        doReturn(expAbortFuture).when(mockDelegate).abort();
-
-        List<Future<Object>> expCohortFutures = new ArrayList<>();
-        doReturn(expCohortFutures).when(mockDelegate).getCohortFutures();
-
-        Throwable debugContext = new RuntimeException("mock");
-        DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId, mockDelegate, debugContext);
-
-        Logger mockLogger = mock(Logger.class);
-        cohort.setLogger(mockLogger);
-
-        assertSame("canCommit", expFailedFuture, cohort.canCommit());
-        verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
-        reset(mockLogger);
-        assertSame("preCommit", expFailedFuture, cohort.preCommit());
-        verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
-        reset(mockLogger);
-        assertSame("commit", expFailedFuture, cohort.commit());
-        verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
-        assertSame("abort", expAbortFuture, cohort.abort());
-
-        assertSame("getCohortFutures", expCohortFutures, cohort.getCohortFutures());
-
-        reset(mockLogger);
-        ListenableFuture<Boolean> expSuccessFuture = Futures.immediateFuture(Boolean.TRUE);
-        doReturn(expSuccessFuture).when(mockDelegate).canCommit();
-
-        assertSame("canCommit", expSuccessFuture, cohort.canCommit());
-        verify(mockLogger, never()).warn(anyString(), any(TransactionIdentifier.class), any(Throwable.class),
-                any(Throwable.class));
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapperTest.java
deleted file mode 100644 (file)
index 0d0b03c..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class DelayedTransactionContextWrapperTest {
-    @Mock
-    private ActorUtils actorUtils;
-
-    @Mock
-    private TransactionContext transactionContext;
-
-    private DelayedTransactionContextWrapper transactionContextWrapper;
-
-    @Before
-    public void setUp() {
-        doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
-        transactionContextWrapper = new DelayedTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
-            DelayedTransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
-    }
-
-    @Test
-    public void testExecutePriorTransactionOperations() {
-        for (int i = 0; i < 100; i++) {
-            transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
-        }
-        assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
-
-        transactionContextWrapper.executePriorTransactionOperations(transactionContext);
-
-        assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java
deleted file mode 100644 (file)
index 44f246f..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class DirectTransactionContextWrapperTest {
-    @Mock
-    private ActorUtils actorUtils;
-
-    @Mock
-    private TransactionContext transactionContext;
-
-    @Mock
-    private TransactionOperation transactionOperation;
-
-    private DirectTransactionContextWrapper contextWrapper;
-
-    @Before
-    public void setUp() {
-        doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
-        contextWrapper = new DirectTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
-                DirectTransactionContextWrapperTest.class, "mock"), actorUtils, "mock",
-                transactionContext);
-    }
-
-    @Test
-    public void testMaybeExecuteTransactionOperation() {
-        contextWrapper.maybeExecuteTransactionOperation(transactionOperation);
-        verify(transactionOperation, times(1)).invoke(transactionContext, null);
-    }
-}
index 45e5a8bf6dab6c6e5d8cd796924bbc832a087606..a6ff215b8b760e0facb7fb5ac2a899ef9e54dc22 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -65,14 +64,12 @@ import org.junit.runners.Parameterized.Parameters;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
 import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
@@ -373,21 +370,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // wait to let the shard catch up with purged
         await("Range set leak test").atMost(5, TimeUnit.SECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
-                        .orElseThrow();
-                    final var frontendMetadata =
-                        (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
-                            .executeOperation(localShard, new RequestFrontendMetadata());
-
-                    final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2);
-                    } else {
-                        assertAskClientMetadata(clientMeta);
-                    }
-                });
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+                final var frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(localShard, new RequestFrontendMetadata());
+
+                assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+            });
 
         try (var tx = txChain.newReadOnlyTransaction()) {
             final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
@@ -396,12 +387,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
-        // ask based should track no metadata
-        assertEquals(List.of(), clientMeta.getCurrentHistories());
-    }
-
-    private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+    private static void assertClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
         final var iterator = clientMeta.getCurrentHistories().iterator();
         var metadata = iterator.next();
         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
@@ -414,19 +400,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testCloseTransactionMetadataLeak() throws Exception {
-        // FIXME: CONTROLLER-2016: ask-based frontend triggers this:
-        //
-        // java.lang.IllegalStateException: Previous transaction
-        //            member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet
-        //        at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady()
-        //        at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction()
-        assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class));
-
         initDatastoresWithCars("testCloseTransactionMetadataLeak");
 
-        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+        final var txChain = followerDistributedDataStore.createTransactionChain();
 
-        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        var writeTx = txChain.newWriteOnlyTransaction();
         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
         followerTestKit.doCommit(writeTx.ready());
@@ -444,21 +422,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // wait to let the shard catch up with purged
         await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
-                        .orElseThrow();
-                    final var frontendMetadata =
-                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
-                                    .executeOperation(localShard, new RequestFrontendMetadata());
-
-                    final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2);
-                    } else {
-                        assertAskClientMetadata(clientMeta);
-                    }
-                });
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+                final var frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(localShard, new RequestFrontendMetadata());
+
+                assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+            });
     }
 
     @Test
@@ -1113,23 +1085,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
         final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
-        final ListenableFuture<Boolean> canCommit;
-
-        // There is difference in behavior here:
-        if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-            // ask-based canCommit() times out and aborts
-            final var ex = assertThrows(ExecutionException.class,
-                () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
-            assertThat(ex, instanceOf(NoShardLeaderException.class));
-            assertThat(ex.getMessage(), containsString(
-                "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
-            canCommit = null;
-        } else {
-            // tell-based canCommit() does not have a real timeout and hence continues
-            canCommit = noShardLeaderCohort.canCommit();
-            Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
-            assertFalse(canCommit.isDone());
-        }
+        // tell-based canCommit() does not have a real timeout and hence continues
+        final var canCommit = noShardLeaderCohort.canCommit();
+        Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+        assertFalse(canCommit.isDone());
 
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
                 .shardElectionTimeoutFactor(100));
@@ -1142,18 +1101,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
         leaderTestKit.doCommit(successTxCohort);
 
-        // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
-        if (canCommit != null) {
-            final var ex = assertThrows(ExecutionException.class,
-                () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
-            assertThat(ex, instanceOf(OptimisticLockFailedException.class));
-            assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
-            final var cause = ex.getCause();
-            assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
-            final var cmae = (ConflictingModificationAppliedException) cause;
-            assertEquals("Node was created by other transaction.", cmae.getMessage());
-            assertEquals(CarsModel.BASE_PATH, cmae.getPath());
-        }
+        // continuation of canCommit(): readied transaction will complete commit, but will report an OLFE
+        final var ex = assertThrows(ExecutionException.class,
+            () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+        assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+        assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+        final var cause = ex.getCause();
+        assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+        final var cmae = (ConflictingModificationAppliedException) cause;
+        assertEquals("Node was created by other transaction.", cmae.getMessage());
+        assertEquals(CarsModel.BASE_PATH, cmae.getPath());
     }
 
     @Test
@@ -1354,9 +1311,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
-        // The slicing is only implemented for tell-based protocol
-        assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
-
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);
         initDatastoresWithCars("testLargeReadReplySlicing");
index 296a4dea29cde8bdb995a0f70cc5b9f810bcfec4..855c16599fc88de22d116b65e590c44434987e0c 100644 (file)
@@ -30,7 +30,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -111,16 +110,12 @@ public class IntegrationTestKit extends ShardTestKit {
                                                      final String typeName, final String moduleShardsConfig,
                                                      final String modulesConfig, final boolean waitUntilLeader,
                                                      final EffectiveModelContext schemaContext,
-                                                     final String... shardNames)
-            throws Exception {
+                                                     final String... shardNames) throws Exception {
         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
 
         setDataStoreName(typeName);
 
-        // Make sure we set up datastore context correctly
-        datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
-
         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
index 845a39ad91211735ae9bfae7ed585f2aea860c26..8b30a85566f97e117566af136c9976073f3382b8 100644 (file)
@@ -25,7 +25,7 @@ import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
deleted file mode 100644 (file)
index 3c32121..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.util.concurrent.FluentFutures;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.Future;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class LocalTransactionContextTest {
-    @Mock
-    private DOMStoreReadWriteTransaction readWriteTransaction;
-    @Mock
-    private LocalTransactionReadySupport mockReadySupport;
-
-    private LocalTransactionContext localTransactionContext;
-
-    @Before
-    public void setUp() {
-        final TransactionIdentifier txId = new TransactionIdentifier(new LocalHistoryIdentifier(ClientIdentifier.create(
-            FrontendIdentifier.create(MemberName.forName("member"), FrontendType.forName("type")), 0), 0), 0);
-
-        localTransactionContext = new LocalTransactionContext(readWriteTransaction, txId, mockReadySupport) {
-            @Override
-            DOMStoreWriteTransaction getWriteDelegate() {
-                return readWriteTransaction;
-            }
-
-            @Override
-            DOMStoreReadTransaction getReadDelegate() {
-                return readWriteTransaction;
-            }
-        };
-    }
-
-    @Test
-    public void testWrite() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
-        verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
-    }
-
-    @Test
-    public void testMerge() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
-        verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
-    }
-
-    @Test
-    public void testDelete() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        localTransactionContext.executeDelete(yangInstanceIdentifier, null);
-        verify(readWriteTransaction).delete(yangInstanceIdentifier);
-    }
-
-    @Test
-    public void testRead() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
-        doReturn(FluentFutures.immediateFluentFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
-            .read(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.create(), null);
-        verify(readWriteTransaction).read(yangInstanceIdentifier);
-    }
-
-    @Test
-    public void testExists() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        doReturn(FluentFutures.immediateTrueFluentFuture()).when(readWriteTransaction).exists(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.create(), null);
-        verify(readWriteTransaction).exists(yangInstanceIdentifier);
-    }
-
-    @Test
-    public void testReady() {
-        final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
-        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(Optional.empty());
-        doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
-
-        Future<ActorSelection> future = localTransactionContext.readyTransaction(null, Optional.empty());
-        assertTrue(future.isCompleted());
-
-        verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
-    }
-
-    @Test
-    public void testReadyWithWriteError() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
-        RuntimeException error = new RuntimeException("mock");
-        doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
-
-        localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
-        localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
-
-        verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
-
-        doReadyWithExpectedError(error);
-    }
-
-    @Test
-    public void testReadyWithMergeError() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
-        RuntimeException error = new RuntimeException("mock");
-        doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
-
-        localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
-        localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
-
-        verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
-
-        doReadyWithExpectedError(error);
-    }
-
-    @Test
-    public void testReadyWithDeleteError() {
-        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
-        RuntimeException error = new RuntimeException("mock");
-        doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
-
-        localTransactionContext.executeDelete(yangInstanceIdentifier, null);
-        localTransactionContext.executeDelete(yangInstanceIdentifier, null);
-
-        verify(readWriteTransaction).delete(yangInstanceIdentifier);
-
-        doReadyWithExpectedError(error);
-    }
-
-    private void doReadyWithExpectedError(final RuntimeException expError) {
-        LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
-        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(Optional.empty());
-        doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
-
-        localTransactionContext.readyTransaction(null, Optional.empty());
-    }
-}
@@ -5,38 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import java.util.concurrent.atomic.AtomicReference;
 
+@Deprecated(since = "9.0.0", forRemoval = true)
 interface OperationCallback {
-    OperationCallback NO_OP_CALLBACK = new OperationCallback() {
-        @Override
-        public void run() {
-        }
-
-        @Override
-        public void success() {
-        }
-
-        @Override
-        public void failure() {
-        }
-
-        @Override
-        public void pause() {
-        }
-
-        @Override
-        public void resume() {
-        }
-    };
-
     class Reference extends AtomicReference<OperationCallback> {
         private static final long serialVersionUID = 1L;
 
-        Reference(OperationCallback initialValue) {
+        Reference(final OperationCallback initialValue) {
             super(initialValue);
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java
deleted file mode 100644 (file)
index 6eedc7d..0000000
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import akka.actor.ActorRef;
-import akka.actor.Status.Failure;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.OnComplete;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Test whether RmoteTransactionContext operates correctly.
- */
-public class RemoteTransactionContextTest extends AbstractActorTest {
-    private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
-        ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
-        0), 0);
-
-    private OperationLimiter limiter;
-    private RemoteTransactionContext txContext;
-    private ActorUtils actorUtils;
-    private TestKit kit;
-
-    @Before
-    public void before() {
-        kit = new TestKit(getSystem());
-        actorUtils = Mockito.spy(new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
-            mock(Configuration.class)));
-        limiter = new OperationLimiter(TX_ID, 4, 0);
-        txContext = new RemoteTransactionContext(TX_ID, actorUtils.actorSelection(kit.getRef().path()), actorUtils,
-            DataStoreVersions.CURRENT_VERSION, limiter);
-        txContext.operationHandOffComplete();
-    }
-
-    /**
-     * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
-     * need to complete immediately with the failure and modifications should not be throttled and thrown away
-     * immediately.
-     */
-    @Test
-    public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
-        txContext.executeDelete(null, null);
-        txContext.executeDelete(null, null);
-        assertEquals(2, limiter.availablePermits());
-
-        final Future<Object> sendFuture = txContext.sendBatchedModifications();
-        assertEquals(2, limiter.availablePermits());
-
-        BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
-        assertEquals(2, msg.getModifications().size());
-        assertEquals(1, msg.getTotalMessagesSent());
-        sendReply(new Failure(new NullPointerException()));
-        assertFuture(sendFuture, new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) {
-                assertTrue(failure instanceof NullPointerException);
-                assertEquals(4, limiter.availablePermits());
-
-                // The transaction has failed, no throttling should occur
-                txContext.executeDelete(null, null);
-                assertEquals(4, limiter.availablePermits());
-
-                // Executing a read should result in immediate failure
-                final SettableFuture<Boolean> readFuture = SettableFuture.create();
-                txContext.executeRead(new DataExists(), readFuture, null);
-                assertTrue(readFuture.isDone());
-                try {
-                    readFuture.get();
-                    fail("Read future did not fail");
-                } catch (ExecutionException | InterruptedException e) {
-                    assertTrue(e.getCause() instanceof NullPointerException);
-                }
-            }
-        });
-
-        final Future<Object> commitFuture = txContext.directCommit(null);
-
-        msg = kit.expectMsgClass(BatchedModifications.class);
-        // Modification should have been thrown away by the dropped transmit induced by executeRead()
-        assertEquals(0, msg.getModifications().size());
-        assertTrue(msg.isDoCommitOnReady());
-        assertTrue(msg.isReady());
-        assertEquals(2, msg.getTotalMessagesSent());
-        sendReply(new Failure(new IllegalStateException()));
-        assertFuture(commitFuture, new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) {
-                assertTrue(failure instanceof IllegalStateException);
-            }
-        });
-
-        kit.expectNoMessage();
-    }
-
-    /**
-     * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
-     * case, too.
-     */
-    @Test
-    public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
-        txContext.executeDelete(null, null);
-        txContext.executeDelete(null, null);
-        txContext.executeDelete(null, null);
-        txContext.executeDelete(null, null);
-        assertEquals(0, limiter.availablePermits());
-        txContext.executeDelete(null, null);
-        // Last acquire should have failed ...
-        assertEquals(0, limiter.availablePermits());
-
-        final Future<Object> future = txContext.sendBatchedModifications();
-        assertEquals(0, limiter.availablePermits());
-
-        BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
-        // ... so we are sending 5 modifications ...
-        assertEquals(5, msg.getModifications().size());
-        assertEquals(1, msg.getTotalMessagesSent());
-        sendReply(new Failure(new NullPointerException()));
-
-        assertFuture(future, new OnComplete<>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) {
-                assertTrue(failure instanceof NullPointerException);
-                // ... but they account for only 4 permits.
-                assertEquals(4, limiter.availablePermits());
-            }
-        });
-
-        kit.expectNoMessage();
-    }
-
-    private void sendReply(final Object message) {
-        final ActorRef askActor = kit.getLastSender();
-        kit.watch(askActor);
-        kit.reply(new Failure(new IllegalStateException()));
-        kit.expectTerminated(askActor);
-    }
-
-    private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
-            throws TimeoutException, InterruptedException {
-        Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
-        future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
-    }
-}
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -31,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +41,11 @@ import scala.concurrent.Future;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
  */
-public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
-
+@Deprecated(since = "9.0.0", forRemoval = true)
+final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
+    private static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
+        Futures.immediateFuture(Empty.value());
 
     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
         @Override
@@ -67,13 +71,36 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         }
     };
 
+    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public void success() {
+        }
+
+        @Override
+        public void failure() {
+        }
+
+        @Override
+        public void pause() {
+        }
+
+        @Override
+        public void resume() {
+        }
+    };
+
+
     private final ActorUtils actorUtils;
     private final List<CohortInfo> cohorts;
     private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
     private final TransactionIdentifier transactionId;
     private volatile OperationCallback commitOperationCallback;
 
-    public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
+    ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
             final TransactionIdentifier transactionId) {
         this.actorUtils = actorUtils;
         this.cohorts = cohorts;
@@ -242,13 +269,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         // it's the original exception that is the root cause and of more interest to the client.
 
         return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
-            OperationCallback.NO_OP_CALLBACK);
+            NO_OP_CALLBACK);
     }
 
     @Override
     public ListenableFuture<? extends CommitInfo> commit() {
         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
-            OperationCallback.NO_OP_CALLBACK;
+            NO_OP_CALLBACK;
 
         return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
             operationCallback);
@@ -355,16 +382,6 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         }, actorUtils.getClientDispatcher());
     }
 
-    @Override
-    List<Future<ActorSelection>> getCohortFutures() {
-        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
-        for (CohortInfo info: cohorts) {
-            cohortFutures.add(info.getActorFuture());
-        }
-
-        return cohortFutures;
-    }
-
     static class CohortInfo {
         private final Future<ActorSelection> actorFuture;
         private final Supplier<Short> actorVersionSupplier;
index 45319712ae87bfd3c30cc9704bddf66d4e25fea7..e2b3872d864bc9b1f9b542efe9a5596dfbb8e4dd 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutur
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+@Deprecated(since = "9.0.0", forRemoval = true)
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     static class TestException extends RuntimeException {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
deleted file mode 100644 (file)
index 37cdc4a..0000000
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
-
-import akka.actor.ActorRef;
-import akka.util.Timeout;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import scala.concurrent.Promise;
-
-public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
-    private LocalHistoryIdentifier historyId;
-
-    @Override
-    public void setUp() {
-        super.setUp();
-        historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
-    }
-
-    @SuppressWarnings("resource")
-    @Test
-    public void testNewReadOnlyTransaction() {
-
-        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
-        Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
-
-    }
-
-    @SuppressWarnings("resource")
-    @Test
-    public void testNewReadWriteTransaction() {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
-        Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
-
-    }
-
-    @SuppressWarnings("resource")
-    @Test
-    public void testNewWriteOnlyTransaction() {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
-        Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
-
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testClose() {
-        new TransactionChainProxy(mockComponentFactory, historyId).close();
-
-        verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
-    }
-
-    @Test
-    public void testRateLimitingUsedInReadWriteTxCreation() {
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            txChainProxy.newReadWriteTransaction();
-
-            verify(mockActorContext, times(1)).acquireTxCreationPermit();
-        }
-    }
-
-    @Test
-    public void testRateLimitingUsedInWriteOnlyTxCreation() {
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            txChainProxy.newWriteOnlyTransaction();
-
-            verify(mockActorContext, times(1)).acquireTxCreationPermit();
-        }
-    }
-
-    @Test
-    public void testRateLimitingNotUsedInReadOnlyTxCreation() {
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            txChainProxy.newReadOnlyTransaction();
-
-            verify(mockActorContext, times(0)).acquireTxCreationPermit();
-        }
-    }
-
-    /**
-     * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
-     * initiated until the first one completes its read future.
-     */
-    @Test
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void testChainedWriteOnlyTransactions() throws Exception {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
-
-            Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
-            doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
-            DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
-
-            NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
-            writeTx1.ready();
-
-            verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-            verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
-
-            ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
-
-            expectBatchedModifications(txActorRef2, 1);
-
-            final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
-            final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
-
-            final AtomicReference<Exception> caughtEx = new AtomicReference<>();
-            final CountDownLatch write2Complete = new CountDownLatch(1);
-            new Thread(() -> {
-                try {
-                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
-                } catch (Exception e) {
-                    caughtEx.set(e);
-                } finally {
-                    write2Complete.countDown();
-                }
-            }).start();
-
-            assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
-
-            if (caughtEx.get() != null) {
-                throw caughtEx.get();
-            }
-
-            try {
-                verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-            } catch (AssertionError e) {
-                fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
-            }
-
-            batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
-
-            // Tx 2 should've proceeded to find the primary shard.
-            verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
-                    eq(DefaultShardStrategy.DEFAULT_SHARD));
-        }
-    }
-
-    /**
-     * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
-     * initiated until the first one completes its read future.
-     */
-    @Test
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void testChainedReadWriteTransactions() throws Exception {
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-            expectBatchedModifications(txActorRef1, 1);
-
-            Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
-            doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
-            DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
-
-            NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
-            writeTx1.ready();
-
-            verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
-
-            String tx2MemberName = "mock-member";
-            ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
-            ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
-                    DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
-
-            expectBatchedModifications(txActorRef2, 1);
-
-            final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
-            final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
-
-            final AtomicReference<Exception> caughtEx = new AtomicReference<>();
-            final CountDownLatch write2Complete = new CountDownLatch(1);
-            new Thread(() -> {
-                try {
-                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
-                } catch (Exception e) {
-                    caughtEx.set(e);
-                } finally {
-                    write2Complete.countDown();
-                }
-            }).start();
-
-            assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
-
-            if (caughtEx.get() != null) {
-                throw caughtEx.get();
-            }
-
-            try {
-                verify(mockActorContext, never()).executeOperationAsync(
-                        eq(getSystem().actorSelection(shardActorRef2.path())),
-                        eqCreateTransaction(tx2MemberName, READ_WRITE));
-            } catch (AssertionError e) {
-                fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
-            }
-
-            readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
-
-            verify(mockActorContext, timeout(5000)).executeOperationAsync(
-                    eq(getSystem().actorSelection(shardActorRef2.path())),
-                    eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
-        }
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testChainedWriteTransactionsWithPreviousTxNotReady() {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        expectBatchedModifications(actorRef, 1);
-
-        try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
-            DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
-
-            NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
-            txChainProxy.newWriteOnlyTransaction();
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
deleted file mode 100644 (file)
index a7de0d2..0000000
+++ /dev/null
@@ -1,1516 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import scala.concurrent.Promise;
-
-@SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
-public class TransactionProxyTest extends AbstractTransactionProxyTest {
-
-    @SuppressWarnings("serial")
-    static class TestException extends RuntimeException {
-    }
-
-    interface Invoker {
-        FluentFuture<?> invoke(TransactionProxy proxy);
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        assertEquals(Optional.empty(), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
-
-        NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testReadWithInvalidReplyMessageType() throws Throwable {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        doReturn(Futures.successful(new Object())).when(mockActorContext)
-                .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        try {
-            transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
-    }
-
-    @Test(expected = TestException.class)
-    public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext)
-                .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
-    }
-
-    private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
-            throws Throwable {
-        ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        if (exToThrow instanceof PrimaryNotFoundException) {
-            doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
-        } else {
-            doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
-                    .findPrimaryShardAsync(anyString());
-        }
-
-        doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), any(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
-    }
-
-    private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
-        testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
-    }
-
-    @Test(expected = PrimaryNotFoundException.class)
-    public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
-        testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
-    }
-
-    @Test(expected = TestException.class)
-    public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
-        testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
-                new TestException()));
-    }
-
-    @Test(expected = TestException.class)
-    public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
-        testReadWithExceptionOnInitialCreateTransaction(new TestException());
-    }
-
-    @Test
-    public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModifications(actorRef, 1);
-
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, expectedNode);
-
-        assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
-
-        InOrder inOrder = Mockito.inOrder(mockActorContext);
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-    }
-
-    @Test
-    public void testReadPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-        assertThrows(IllegalStateException.class, () -> transactionProxy.read(TestModel.TEST_PATH));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidCreateTransactionReply() throws Throwable {
-        ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
-                .actorSelection(actorRef.path().toString());
-
-        doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
-                .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
-            eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
-            any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
-    }
-
-    @Test
-    public void testExists() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
-        assertEquals("Exists response", Boolean.FALSE, exists);
-
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
-        assertEquals("Exists response", Boolean.TRUE, exists);
-    }
-
-    @Test(expected = PrimaryNotFoundException.class)
-    public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
-        testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
-            proxy -> proxy.exists(TestModel.TEST_PATH));
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testExistsWithInvalidReplyMessageType() throws Throwable {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        doReturn(Futures.successful(new Object())).when(mockActorContext)
-                .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        try {
-            transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
-    }
-
-    @Test(expected = TestException.class)
-    public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext)
-                .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
-    }
-
-    @Test
-    public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModifications(actorRef, 1);
-
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
-        assertEquals("Exists response", Boolean.TRUE, exists);
-
-        InOrder inOrder = Mockito.inOrder(mockActorContext);
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testExistsPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-        transactionProxy.exists(TestModel.TEST_PATH);
-    }
-
-    @Test
-    public void testWrite() {
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModifications(actorRef, 1);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
-    }
-
-    @Test
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void testWriteAfterAsyncRead() throws Exception {
-        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
-                DefaultShardStrategy.DEFAULT_SHARD);
-
-        Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
-        doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
-                eq(getSystem().actorSelection(actorRef.path())),
-                eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        expectBatchedModificationsReady(actorRef);
-
-        final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        final CountDownLatch readComplete = new CountDownLatch(1);
-        final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
-        com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
-                new  FutureCallback<Optional<NormalizedNode>>() {
-                    @Override
-                    public void onSuccess(final Optional<NormalizedNode> result) {
-                        try {
-                            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-                        } catch (Exception e) {
-                            caughtEx.set(e);
-                        } finally {
-                            readComplete.countDown();
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable failure) {
-                        caughtEx.set(failure);
-                        readComplete.countDown();
-                    }
-                }, MoreExecutors.directExecutor());
-
-        createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
-
-        Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
-
-        final Throwable t = caughtEx.get();
-        if (t != null) {
-            Throwables.propagateIfPossible(t, Exception.class);
-            throw new RuntimeException(t);
-        }
-
-        // This sends the batched modification.
-        transactionProxy.ready();
-
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testWritePreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testWriteAfterReadyPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.ready();
-
-        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-    }
-
-    @Test
-    public void testMerge() {
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModifications(actorRef, 1);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
-    }
-
-    @Test
-    public void testDelete() {
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        expectBatchedModifications(actorRef, 1);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
-    }
-
-    @Test
-    public void testReadWrite() {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        expectBatchedModifications(actorRef, 1);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), false,
-                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-    }
-
-    @Test
-    public void testReadyWithReadWrite() {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), true, true,
-                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
-        assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
-    }
-
-    @Test
-    public void testReadyWithNoModifications() {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), true, true);
-    }
-
-    @Test
-    public void testReadyWithMultipleShardWrites() {
-        ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
-                TestModel.JUNK_QNAME.getLocalName());
-
-        expectBatchedModificationsReady(actorRef1);
-        expectBatchedModificationsReady(actorRef2);
-
-        ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
-                .actorSelection(actorRef3.path().toString());
-
-        doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
-                .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
-
-        expectReadyLocalTransaction(actorRef3, false);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
-        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
-                actorSelection(actorRef2), actorSelection(actorRef3));
-
-        SortedSet<String> expShardNames =
-                ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
-                        TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
-
-        ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
-        assertEquals("Participating shards", Optional.of(expShardNames),
-            batchedMods.getValue().getParticipatingShardNames());
-
-        batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
-        assertEquals("Participating shards", Optional.of(expShardNames),
-            batchedMods.getValue().getParticipatingShardNames());
-
-        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
-        assertEquals("Participating shards", Optional.of(expShardNames),
-            readyLocalTx.getValue().getParticipatingShardNames());
-    }
-
-    @Test
-    public void testReadyWithWriteOnlyAndLastBatchPending() {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), true, true,
-                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-    }
-
-    @Test
-    public void testReadyWithWriteOnlyAndLastBatchEmpty() {
-        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), false,
-                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
-        verifyBatchedModifications(batchedModifications.get(1), true, true);
-    }
-
-    @Test
-    public void testReadyWithReplyFailure() {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
-    }
-
-    @Test
-    public void testReadyWithDebugContextEnabled() {
-        dataStoreContextBuilder.transactionDebugContextEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof DebugThreePhaseCommitCohort);
-
-        verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
-    }
-
-    @Test
-    public void testReadyWithLocalTransaction() {
-        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
-                .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        expectReadyLocalTransaction(shardActorRef, true);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
-        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
-        assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
-    }
-
-    @Test
-    public void testReadyWithLocalTransactionWithFailure() {
-        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        DataTree mockDataTree = createDataTree();
-        DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
-        doThrow(new RuntimeException("mock")).when(mockModification).ready();
-
-        doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
-                .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        expectReadyLocalTransaction(shardActorRef, true);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-        verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
-    }
-
-    private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
-        doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
-    }
-
-    @Test
-    public void testWriteOnlyTxWithPrimaryNotFoundException() {
-        testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
-    }
-
-    @Test
-    public void testWriteOnlyTxWithNotInitializedException() {
-        testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
-    }
-
-    @Test
-    public void testWriteOnlyTxWithNoShardLeaderException() {
-        testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
-    }
-
-    @Test
-    public void testReadyWithInvalidReplyMessageType() {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-        ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
-                TestModel.JUNK_QNAME.getLocalName());
-
-        doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
-        expectBatchedModificationsReady(actorRef2);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
-        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
-                IllegalArgumentException.class);
-    }
-
-    @Test
-    public void testGetIdentifier() {
-        setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        Object id = transactionProxy.getIdentifier();
-        assertNotNull("getIdentifier returned null", id);
-        assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
-    }
-
-    @Test
-    public void testClose() {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.read(TestModel.TEST_PATH);
-
-        transactionProxy.close();
-
-        verify(mockActorContext).sendOperationAsync(
-                eq(actorSelection(actorRef)), isA(CloseTransaction.class));
-    }
-
-    private interface TransactionProxyOperation {
-        void run(TransactionProxy transactionProxy);
-    }
-
-    private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
-                dataTree);
-    }
-
-    private void throttleOperation(final TransactionProxyOperation operation) {
-        throttleOperation(operation, 1, true);
-    }
-
-    private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
-            final boolean shardFound) {
-        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
-                mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
-    }
-
-    private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
-            final boolean shardFound, final long expectedCompletionTime) {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
-        // we now allow one extra permit to be allowed for ready
-        doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
-                .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
-                        .getDatastoreContext();
-
-        doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        if (shardFound) {
-            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
-                    .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
-                    .findPrimaryShardAsync(eq("cars"));
-
-        } else {
-            doReturn(Futures.failed(new Exception("not found")))
-                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-        }
-
-        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
-                any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        long start = System.nanoTime();
-
-        operation.run(transactionProxy);
-
-        long end = System.nanoTime();
-
-        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expectedCompletionTime, end - start),
-                end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
-
-    }
-
-    private void completeOperation(final TransactionProxyOperation operation) {
-        completeOperation(operation, true);
-    }
-
-    private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        if (shardFound) {
-            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
-                    .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-        } else {
-            doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
-                    .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-        }
-
-        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-        String actorPath = txActorRef.path().toString();
-        CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
-                DataStoreVersions.CURRENT_VERSION);
-
-        doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
-
-        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
-                eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
-                any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        long start = System.nanoTime();
-
-        operation.run(transactionProxy);
-
-        long end = System.nanoTime();
-
-        long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
-                .getOperationTimeoutInMillis());
-        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expected, end - start), end - start <= expected);
-    }
-
-    private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
-                .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        long start = System.nanoTime();
-
-        operation.run(transactionProxy);
-
-        long end = System.nanoTime();
-
-        long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
-                .getOperationTimeoutInMillis());
-        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
-                end - start <= expected);
-    }
-
-    private static DataTree createDataTree() {
-        DataTree dataTree = mock(DataTree.class);
-        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
-        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
-
-        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
-        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
-
-        return dataTree;
-    }
-
-    private static DataTree createDataTree(final NormalizedNode readResponse) {
-        DataTree dataTree = mock(DataTree.class);
-        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
-        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
-
-        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
-        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
-        doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
-
-        return dataTree;
-    }
-
-
-    @Test
-    public void testWriteCompletionForLocalShard() {
-        completeOperationLocal(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        }, createDataTree());
-    }
-
-    @Test
-    public void testWriteThrottlingWhenShardFound() {
-        throttleOperation(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectIncompleteBatchedModifications();
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-        });
-    }
-
-    @Test
-    public void testWriteThrottlingWhenShardNotFound() {
-        // Confirm that there is no throttling when the Shard is not found
-        completeOperation(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectBatchedModifications(2);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-        }, false);
-
-    }
-
-
-    @Test
-    public void testWriteCompletion() {
-        completeOperation(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectBatchedModifications(2);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-        });
-    }
-
-    @Test
-    public void testMergeThrottlingWhenShardFound() {
-        throttleOperation(transactionProxy -> {
-            NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectIncompleteBatchedModifications();
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-        });
-    }
-
-    @Test
-    public void testMergeThrottlingWhenShardNotFound() {
-        completeOperation(transactionProxy -> {
-            NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectBatchedModifications(2);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-        }, false);
-    }
-
-    @Test
-    public void testMergeCompletion() {
-        completeOperation(transactionProxy -> {
-            NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectBatchedModifications(2);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-        });
-
-    }
-
-    @Test
-    public void testMergeCompletionForLocalShard() {
-        completeOperationLocal(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        }, createDataTree());
-    }
-
-
-    @Test
-    public void testDeleteThrottlingWhenShardFound() {
-
-        throttleOperation(transactionProxy -> {
-            expectIncompleteBatchedModifications();
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-        });
-    }
-
-
-    @Test
-    public void testDeleteThrottlingWhenShardNotFound() {
-
-        completeOperation(transactionProxy -> {
-            expectBatchedModifications(2);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-        }, false);
-    }
-
-    @Test
-    public void testDeleteCompletionForLocalShard() {
-        completeOperationLocal(transactionProxy -> {
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-        }, createDataTree());
-
-    }
-
-    @Test
-    public void testDeleteCompletion() {
-        completeOperation(transactionProxy -> {
-            expectBatchedModifications(2);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-
-            transactionProxy.delete(TestModel.TEST_PATH);
-        });
-
-    }
-
-    @Test
-    public void testReadThrottlingWhenShardFound() {
-
-        throttleOperation(transactionProxy -> {
-            doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqReadData());
-
-            transactionProxy.read(TestModel.TEST_PATH);
-
-            transactionProxy.read(TestModel.TEST_PATH);
-        });
-    }
-
-    @Test
-    public void testReadThrottlingWhenShardNotFound() {
-
-        completeOperation(transactionProxy -> {
-            doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqReadData());
-
-            transactionProxy.read(TestModel.TEST_PATH);
-
-            transactionProxy.read(TestModel.TEST_PATH);
-        }, false);
-    }
-
-
-    @Test
-    public void testReadCompletion() {
-        completeOperation(transactionProxy -> {
-            NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqReadData(), any(Timeout.class));
-
-            transactionProxy.read(TestModel.TEST_PATH);
-
-            transactionProxy.read(TestModel.TEST_PATH);
-        });
-
-    }
-
-    @Test
-    public void testReadCompletionForLocalShard() {
-        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        completeOperationLocal(transactionProxy -> {
-            transactionProxy.read(TestModel.TEST_PATH);
-
-            transactionProxy.read(TestModel.TEST_PATH);
-        }, createDataTree(nodeToRead));
-
-    }
-
-    @Test
-    public void testReadCompletionForLocalShardWhenExceptionOccurs() {
-        completeOperationLocal(transactionProxy -> {
-            transactionProxy.read(TestModel.TEST_PATH);
-
-            transactionProxy.read(TestModel.TEST_PATH);
-        }, createDataTree());
-
-    }
-
-    @Test
-    public void testExistsThrottlingWhenShardFound() {
-
-        throttleOperation(transactionProxy -> {
-            doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqDataExists());
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-        });
-    }
-
-    @Test
-    public void testExistsThrottlingWhenShardNotFound() {
-
-        completeOperation(transactionProxy -> {
-            doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqDataExists());
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-        }, false);
-    }
-
-
-    @Test
-    public void testExistsCompletion() {
-        completeOperation(transactionProxy -> {
-            doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                    any(ActorSelection.class), eqDataExists(), any(Timeout.class));
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-        });
-
-    }
-
-    @Test
-    public void testExistsCompletionForLocalShard() {
-        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        completeOperationLocal(transactionProxy -> {
-            transactionProxy.exists(TestModel.TEST_PATH);
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-        }, createDataTree(nodeToRead));
-
-    }
-
-    @Test
-    public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
-        completeOperationLocal(transactionProxy -> {
-            transactionProxy.exists(TestModel.TEST_PATH);
-
-            transactionProxy.exists(TestModel.TEST_PATH);
-        }, createDataTree());
-
-    }
-
-    @Test
-    public void testReadyThrottling() {
-
-        throttleOperation(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-            expectBatchedModifications(1);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            transactionProxy.ready();
-        });
-    }
-
-    @Test
-    public void testReadyThrottlingWithTwoTransactionContexts() {
-        throttleOperation(transactionProxy -> {
-            NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
-
-            expectBatchedModifications(2);
-
-            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-            // Trying to write to Cars will cause another transaction context to get created
-            transactionProxy.write(CarsModel.BASE_PATH, carsNode);
-
-            // Now ready should block for both transaction contexts
-            transactionProxy.ready();
-        }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
-                .getOperationTimeoutInMillis()) * 2);
-    }
-
-    private void testModificationOperationBatching(final TransactionType type) {
-        int shardBatchedModificationCount = 3;
-        dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
-
-        expectBatchedModifications(actorRef, shardBatchedModificationCount);
-
-        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
-        NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
-        NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
-        YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
-        NormalizedNode writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
-        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
-        NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
-        NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
-        YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
-        NormalizedNode mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
-        YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
-        YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
-
-        transactionProxy.write(writePath1, writeNode1);
-        transactionProxy.write(writePath2, writeNode2);
-        transactionProxy.delete(deletePath1);
-        transactionProxy.merge(mergePath1, mergeNode1);
-        transactionProxy.merge(mergePath2, mergeNode2);
-        transactionProxy.write(writePath3, writeNode3);
-        transactionProxy.merge(mergePath3, mergeNode3);
-        transactionProxy.delete(deletePath2);
-
-        // This sends the last batch.
-        transactionProxy.ready();
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
-                new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
-
-        verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
-                new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
-
-        verifyBatchedModifications(batchedModifications.get(2), true, true,
-                new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
-
-        assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
-    }
-
-    @Test
-    public void testReadWriteModificationOperationBatching() {
-        testModificationOperationBatching(READ_WRITE);
-    }
-
-    @Test
-    public void testWriteOnlyModificationOperationBatching() {
-        testModificationOperationBatching(WRITE_ONLY);
-    }
-
-    @Test
-    public void testOptimizedWriteOnlyModificationOperationBatching() {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-        testModificationOperationBatching(WRITE_ONLY);
-    }
-
-    @Test
-    public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
-
-        int shardBatchedModificationCount = 10;
-        dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        expectBatchedModifications(actorRef, shardBatchedModificationCount);
-
-        final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
-        final NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
-        NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
-        final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
-        final NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
-        NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
-        final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
-
-        doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
-
-        doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
-
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        transactionProxy.write(writePath1, writeNode1);
-        transactionProxy.write(writePath2, writeNode2);
-
-        Optional<NormalizedNode> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
-
-        assertEquals("Response NormalizedNode", Optional.of(writeNode2), readOptional);
-
-        transactionProxy.merge(mergePath1, mergeNode1);
-        transactionProxy.merge(mergePath2, mergeNode2);
-
-        readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
-
-        transactionProxy.delete(deletePath);
-
-        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-        assertEquals("Exists response", Boolean.TRUE, exists);
-
-        assertEquals("Response NormalizedNode", Optional.of(mergeNode2), readOptional);
-
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
-                new WriteModification(writePath2, writeNode2));
-
-        verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
-                new MergeModification(mergePath2, mergeNode2));
-
-        verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
-
-        InOrder inOrder = Mockito.inOrder(mockActorContext);
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
-        inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-    }
-
-    @Test
-    public void testReadRoot() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
-        EffectiveModelContext schemaContext = SchemaContextHelper.full();
-        Configuration configuration = mock(Configuration.class);
-        doReturn(configuration).when(mockActorContext).getConfiguration();
-        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
-        doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
-
-        NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
-
-        setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
-        setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
-
-        doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
-
-        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        Optional<NormalizedNode> readOptional = transactionProxy.read(
-                YangInstanceIdentifier.of()).get(5, TimeUnit.SECONDS);
-
-        assertTrue("NormalizedNode isPresent", readOptional.isPresent());
-
-        NormalizedNode normalizedNode = readOptional.orElseThrow();
-
-        assertTrue("Expect value to be a Collection", normalizedNode.body() instanceof Collection);
-
-        @SuppressWarnings("unchecked")
-        Collection<NormalizedNode> collection = (Collection<NormalizedNode>) normalizedNode.body();
-
-        for (NormalizedNode node : collection) {
-            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
-        }
-
-        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
-                NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
-
-        assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
-
-        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
-                NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
-
-        assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
-    }
-
-
-    private void setUpReadData(final String shardName, final NormalizedNode expectedNode) {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
-                .actorSelection(shardActorRef.path().toString());
-
-        doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
-                .findPrimaryShardAsync(eq(shardName));
-
-        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
-                .actorSelection(txActorRef.path().toString());
-
-        doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
-                .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
-
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.of()), any(Timeout.class));
-    }
-}
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import com.codahale.metrics.Timer;
@@ -19,7 +18,8 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
  * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
  * transaction.
  */
-public class TransactionRateLimitingCallback implements OperationCallback {
+@Deprecated(since = "9.0.0", forRemoval = true)
+final class TransactionRateLimitingCallback implements OperationCallback {
     private static Ticker TICKER = Ticker.systemTicker();
 
     private enum State {
@@ -33,7 +33,7 @@ public class TransactionRateLimitingCallback implements OperationCallback {
     private long elapsedTime;
     private volatile State state = State.STOPPED;
 
-    TransactionRateLimitingCallback(ActorUtils actorUtils) {
+    TransactionRateLimitingCallback(final ActorUtils actorUtils) {
         commitTimer = actorUtils.getOperationTimer(ActorUtils.COMMIT);
     }
 
@@ -75,7 +75,7 @@ public class TransactionRateLimitingCallback implements OperationCallback {
     }
 
     @VisibleForTesting
-    static void setTicker(Ticker ticker) {
+    static void setTicker(final Ticker ticker) {
         TICKER = ticker;
     }
 }
index 14783441b77e96ac837a784fbe2f923aa1ffbb86..b37dfb545064936ea4936671c8c07d93bf9f1122 100644 (file)
@@ -29,6 +29,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
  *
  * @author Thomas Pantelis
  */
+@Deprecated(since = "9.0.0", forRemoval = true)
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class TransactionRateLimitingCallbackTest {
     @Mock